From ecd68a6057f966c26e3389bfd2064bb6af1dc80c Mon Sep 17 00:00:00 2001 From: casey langen Date: Tue, 6 Oct 2020 21:43:45 -0700 Subject: [PATCH] IQuery::Synchronous support in RemoteLibrary. --- src/core/library/RemoteLibrary.cpp | 35 ++++++++++++++++-------------- src/core/library/RemoteLibrary.h | 10 ++++----- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/core/library/RemoteLibrary.cpp b/src/core/library/RemoteLibrary.cpp index 3e5d9ed8d..4b8bb13bc 100644 --- a/src/core/library/RemoteLibrary.cpp +++ b/src/core/library/RemoteLibrary.cpp @@ -115,7 +115,7 @@ void RemoteLibrary::Close() { std::thread* thread = nullptr; { - std::unique_lock lock(this->mutex); + std::unique_lock lock(this->queueMutex); if (this->thread) { thread = this->thread; this->thread = nullptr; @@ -126,6 +126,7 @@ void RemoteLibrary::Close() { if (thread) { this->queueCondition.notify_all(); + this->syncQueryCondition.notify_all(); thread->join(); delete thread; } @@ -144,20 +145,20 @@ int RemoteLibrary::Enqueue(QueryPtr query, unsigned int options, Callback callba auto serializableQuery = std::dynamic_pointer_cast(query); if (serializableQuery) { - std::unique_lock lock(this->mutex); - - if (this->exit) { /* closed */ - return -1; - } - auto context = std::make_shared(); context->query = serializableQuery; context->callback = callback; if (options & ILibrary::QuerySynchronous) { - this->RunQuery(context, false); /* false = do not notify via QueryCompleted */ + this->RunQuery(context); /* false = do not notify via QueryCompleted */ + std::unique_lock lock(this->queueMutex); + while (!this->exit && context->query->GetStatus() == IQuery::Idle) { + this->syncQueryCondition.wait(lock); + } } else { + std::unique_lock lock(this->queueMutex); + if (this->exit) { return -1; } queryQueue.push_back(context); queueCondition.notify_all(); } @@ -169,7 +170,7 @@ int RemoteLibrary::Enqueue(QueryPtr query, unsigned int options, Callback callba } RemoteLibrary::QueryContextPtr RemoteLibrary::GetNextQuery() { - std::unique_lock lock(this->mutex); + std::unique_lock lock(this->queueMutex); while (!this->queryQueue.size() && !this->exit) { this->queueCondition.wait(lock); } @@ -214,7 +215,7 @@ void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query) QueryContextPtr context; { - std::unique_lock lock(this->mutex); + std::unique_lock lock(this->queueMutex); context = queriesInFlight[messageId]; queriesInFlight.erase(messageId); } @@ -222,17 +223,19 @@ void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query) if (context) { this->OnQueryCompleted(context); } + + this->syncQueryCondition.notify_all(); } -void RemoteLibrary::RunQuery(QueryContextPtr context, bool notify) { +void RemoteLibrary::RunQuery(QueryContextPtr context) { #if 0 - this->RunQueryOnLoopback(context, notify); + this->RunQueryOnLoopback(context); #else - this->RunQueryOnWebSocketClient(context, notify); + this->RunQueryOnWebSocketClient(context); #endif } -void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) { +void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context) { if (context) { /* do everything via loopback to the local library for testing. we do, however, go through the motions by serializing the inbound query to a string, then @@ -263,9 +266,9 @@ void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) { } } -void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context, bool notify) { +void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context) { if (context->query) { - std::unique_lock lock(this->mutex); + std::unique_lock lock(this->queueMutex); const std::string messageId = wsc.EnqueueQuery(context->query); if (messageId.size()) { queriesInFlight[messageId] = context; diff --git a/src/core/library/RemoteLibrary.h b/src/core/library/RemoteLibrary.h index a842d2d0b..c65e9e3bf 100644 --- a/src/core/library/RemoteLibrary.h +++ b/src/core/library/RemoteLibrary.h @@ -100,9 +100,9 @@ namespace musik { namespace core { namespace library { RemoteLibrary(std::string name, int id); /* ctor */ - void RunQuery(QueryContextPtr context, bool notify = true); - void RunQueryOnLoopback(QueryContextPtr context, bool notify); - void RunQueryOnWebSocketClient(QueryContextPtr context, bool notify); + void RunQuery(QueryContextPtr context); + void RunQueryOnLoopback(QueryContextPtr context); + void RunQueryOnWebSocketClient(QueryContextPtr context); void OnQueryCompleted(const std::string& messageId, Query query); void OnQueryCompleted(QueryContextPtr context); @@ -123,8 +123,8 @@ namespace musik { namespace core { namespace library { std::unordered_map queriesInFlight; std::thread* thread; - std::condition_variable_any queueCondition; - std::recursive_mutex mutex; + std::condition_variable_any queueCondition, syncQueryCondition; + std::recursive_mutex queueMutex; std::atomic exit; };