IQuery::Synchronous support in RemoteLibrary.

This commit is contained in:
casey langen 2020-10-06 21:43:45 -07:00
parent 72e8895aa9
commit ecd68a6057
2 changed files with 24 additions and 21 deletions

View File

@ -115,7 +115,7 @@ void RemoteLibrary::Close() {
std::thread* thread = nullptr; std::thread* thread = nullptr;
{ {
std::unique_lock<std::recursive_mutex> lock(this->mutex); std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
if (this->thread) { if (this->thread) {
thread = this->thread; thread = this->thread;
this->thread = nullptr; this->thread = nullptr;
@ -126,6 +126,7 @@ void RemoteLibrary::Close() {
if (thread) { if (thread) {
this->queueCondition.notify_all(); this->queueCondition.notify_all();
this->syncQueryCondition.notify_all();
thread->join(); thread->join();
delete thread; delete thread;
} }
@ -144,20 +145,20 @@ int RemoteLibrary::Enqueue(QueryPtr query, unsigned int options, Callback callba
auto serializableQuery = std::dynamic_pointer_cast<ISerializableQuery>(query); auto serializableQuery = std::dynamic_pointer_cast<ISerializableQuery>(query);
if (serializableQuery) { if (serializableQuery) {
std::unique_lock<std::recursive_mutex> lock(this->mutex);
if (this->exit) { /* closed */
return -1;
}
auto context = std::make_shared<QueryContext>(); auto context = std::make_shared<QueryContext>();
context->query = serializableQuery; context->query = serializableQuery;
context->callback = callback; context->callback = callback;
if (options & ILibrary::QuerySynchronous) { if (options & ILibrary::QuerySynchronous) {
this->RunQuery(context, false); /* false = do not notify via QueryCompleted */ this->RunQuery(context); /* false = do not notify via QueryCompleted */
std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
while (!this->exit && context->query->GetStatus() == IQuery::Idle) {
this->syncQueryCondition.wait(lock);
}
} }
else { else {
std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
if (this->exit) { return -1; }
queryQueue.push_back(context); queryQueue.push_back(context);
queueCondition.notify_all(); queueCondition.notify_all();
} }
@ -169,7 +170,7 @@ int RemoteLibrary::Enqueue(QueryPtr query, unsigned int options, Callback callba
} }
RemoteLibrary::QueryContextPtr RemoteLibrary::GetNextQuery() { RemoteLibrary::QueryContextPtr RemoteLibrary::GetNextQuery() {
std::unique_lock<std::recursive_mutex> lock(this->mutex); std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
while (!this->queryQueue.size() && !this->exit) { while (!this->queryQueue.size() && !this->exit) {
this->queueCondition.wait(lock); this->queueCondition.wait(lock);
} }
@ -214,7 +215,7 @@ void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query)
QueryContextPtr context; QueryContextPtr context;
{ {
std::unique_lock<std::recursive_mutex> lock(this->mutex); std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
context = queriesInFlight[messageId]; context = queriesInFlight[messageId];
queriesInFlight.erase(messageId); queriesInFlight.erase(messageId);
} }
@ -222,17 +223,19 @@ void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query)
if (context) { if (context) {
this->OnQueryCompleted(context); this->OnQueryCompleted(context);
} }
this->syncQueryCondition.notify_all();
} }
void RemoteLibrary::RunQuery(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQuery(QueryContextPtr context) {
#if 0 #if 0
this->RunQueryOnLoopback(context, notify); this->RunQueryOnLoopback(context);
#else #else
this->RunQueryOnWebSocketClient(context, notify); this->RunQueryOnWebSocketClient(context);
#endif #endif
} }
void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context) {
if (context) { if (context) {
/* do everything via loopback to the local library for testing. we do, however, /* do everything via loopback to the local library for testing. we do, however,
go through the motions by serializing the inbound query to a string, then go through the motions by serializing the inbound query to a string, then
@ -263,9 +266,9 @@ void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) {
} }
} }
void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context) {
if (context->query) { if (context->query) {
std::unique_lock<std::recursive_mutex> lock(this->mutex); std::unique_lock<std::recursive_mutex> lock(this->queueMutex);
const std::string messageId = wsc.EnqueueQuery(context->query); const std::string messageId = wsc.EnqueueQuery(context->query);
if (messageId.size()) { if (messageId.size()) {
queriesInFlight[messageId] = context; queriesInFlight[messageId] = context;

View File

@ -100,9 +100,9 @@ namespace musik { namespace core { namespace library {
RemoteLibrary(std::string name, int id); /* ctor */ RemoteLibrary(std::string name, int id); /* ctor */
void RunQuery(QueryContextPtr context, bool notify = true); void RunQuery(QueryContextPtr context);
void RunQueryOnLoopback(QueryContextPtr context, bool notify); void RunQueryOnLoopback(QueryContextPtr context);
void RunQueryOnWebSocketClient(QueryContextPtr context, bool notify); void RunQueryOnWebSocketClient(QueryContextPtr context);
void OnQueryCompleted(const std::string& messageId, Query query); void OnQueryCompleted(const std::string& messageId, Query query);
void OnQueryCompleted(QueryContextPtr context); void OnQueryCompleted(QueryContextPtr context);
@ -123,8 +123,8 @@ namespace musik { namespace core { namespace library {
std::unordered_map<std::string, QueryContextPtr> queriesInFlight; std::unordered_map<std::string, QueryContextPtr> queriesInFlight;
std::thread* thread; std::thread* thread;
std::condition_variable_any queueCondition; std::condition_variable_any queueCondition, syncQueryCondition;
std::recursive_mutex mutex; std::recursive_mutex queueMutex;
std::atomic<bool> exit; std::atomic<bool> exit;
}; };