From 72e8895aa9688e51baa75822fa93c68fcac14c88 Mon Sep 17 00:00:00 2001 From: casey langen Date: Tue, 6 Oct 2020 21:29:08 -0700 Subject: [PATCH] Kind of sort of working now. Need to get synchronous queries working, even though they are generally undesireable. --- src/core/library/RemoteLibrary.cpp | 79 ++++++++++++++++++------------ src/core/library/RemoteLibrary.h | 8 ++- src/core/net/WebSocketClient.cpp | 19 +++---- 3 files changed, 63 insertions(+), 43 deletions(-) diff --git a/src/core/library/RemoteLibrary.cpp b/src/core/library/RemoteLibrary.cpp index 7569a2af9..3e5d9ed8d 100644 --- a/src/core/library/RemoteLibrary.cpp +++ b/src/core/library/RemoteLibrary.cpp @@ -192,61 +192,84 @@ void RemoteLibrary::ThreadProc() { } } +void RemoteLibrary::NotifyQueryCompleted(QueryContextPtr context) { + this->QueryCompleted(context->query.get()); + if (context->callback) { + context->callback(context->query); + } +} + +void RemoteLibrary::OnQueryCompleted(QueryContextPtr context) { + if (context) { + if (this->messageQueue) { + this->messageQueue->Post(std::make_shared(this, context)); + } + else { + this->NotifyQueryCompleted(context); + } + } +} + +void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query) { + QueryContextPtr context; + + { + std::unique_lock lock(this->mutex); + context = queriesInFlight[messageId]; + queriesInFlight.erase(messageId); + } + + if (context) { + this->OnQueryCompleted(context); + } +} + void RemoteLibrary::RunQuery(QueryContextPtr context, bool notify) { +#if 0 this->RunQueryOnLoopback(context, notify); +#else + this->RunQueryOnWebSocketClient(context, notify); +#endif } void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) { if (context) { - /* CAL TODO: eventually make the request over a websocket. right now we just - do everything via loopback to the local library for testing. we do, however, + /* do everything via loopback to the local library for testing. we do, however, go through the motions by serializing the inbound query to a string, then bouncing through the QueryRegister to create a new instance, run the query locally, serialize the result, then deserialize it again to emulate the entire flow. */ - wsc.EnqueueQuery(context->query); - auto localLibrary = LibraryFactory::Instance().Default(); localLibrary->SetMessageQueue(*this->messageQueue); auto localQuery = QueryRegistry::CreateLocalQueryFor( context->query->Name(), context->query->SerializeQuery(), localLibrary); - auto onComplete = [this, notify, context, localQuery]() { - if (notify) { - if (this->messageQueue) { - this->messageQueue->Post(std::make_shared(this, context)); - } - else { - this->QueryCompleted(localQuery.get()); - } - } - else if (context->callback) { - context->callback(context->query); - } - }; - if (!localQuery) { - onComplete(); + OnQueryCompleted(context); return; } localLibrary->Enqueue( localQuery, ILibrary::QuerySynchronous, /* CAL TODO: make async! we have to make TrackList support async lookup first tho. */ - [context, onComplete, localQuery](auto result) { + [this, context, localQuery](auto result) { if (localQuery->GetStatus() == IQuery::Finished) { context->query->DeserializeResult(localQuery->SerializeResult()); } - onComplete(); + this->OnQueryCompleted(context); }); } } void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context, bool notify) { if (context->query) { - wsc.EnqueueQuery(context->query); + std::unique_lock lock(this->mutex); + const std::string messageId = wsc.EnqueueQuery(context->query); + if (messageId.size()) { + queriesInFlight[messageId] = context; + } } } @@ -263,13 +286,7 @@ musik::core::IIndexer* RemoteLibrary::Indexer() { void RemoteLibrary::ProcessMessage(musik::core::runtime::IMessage &message) { if (message.Type() == MESSAGE_QUERY_COMPLETED) { auto context = static_cast(&message)->GetContext(); - auto query = context->query; - - this->QueryCompleted(context->query.get()); - - if (context->callback) { - context->callback(query); - } + this->NotifyQueryCompleted(context); } } @@ -284,9 +301,9 @@ void RemoteLibrary::OnClientStateChanged(Client* client, State newState, State o } void RemoteLibrary::OnClientQuerySucceeded(Client* client, const std::string& messageId, Query query) { - + this->OnQueryCompleted(messageId, query); } void RemoteLibrary::OnClientQueryFailed(Client* client, const std::string& messageId, Query query, Client::ErrorCode result) { - + this->OnQueryCompleted(messageId, query); } \ No newline at end of file diff --git a/src/core/library/RemoteLibrary.h b/src/core/library/RemoteLibrary.h index ad76681b3..a842d2d0b 100644 --- a/src/core/library/RemoteLibrary.h +++ b/src/core/library/RemoteLibrary.h @@ -45,7 +45,7 @@ #include #include #include - +#include #include namespace musik { namespace core { namespace library { @@ -104,6 +104,10 @@ namespace musik { namespace core { namespace library { void RunQueryOnLoopback(QueryContextPtr context, bool notify); void RunQueryOnWebSocketClient(QueryContextPtr context, bool notify); + void OnQueryCompleted(const std::string& messageId, Query query); + void OnQueryCompleted(QueryContextPtr context); + void NotifyQueryCompleted(QueryContextPtr context); + void ThreadProc(); QueryContextPtr GetNextQuery(); @@ -116,6 +120,8 @@ namespace musik { namespace core { namespace library { int id; std::string name; + std::unordered_map queriesInFlight; + std::thread* thread; std::condition_variable_any queueCondition; std::recursive_mutex mutex; diff --git a/src/core/net/WebSocketClient.cpp b/src/core/net/WebSocketClient.cpp index 95597bfdc..6c34790bb 100644 --- a/src/core/net/WebSocketClient.cpp +++ b/src/core/net/WebSocketClient.cpp @@ -164,18 +164,15 @@ void WebSocketClient::Disconnect() { std::string WebSocketClient::EnqueueQuery(Query query) { std::unique_lockmutex)> lock(this->mutex); - if (this->state == State::Connected || this->state == State::Disconnected) { - auto messageId = generateMessageId(); - messageIdToQuery[messageId] = query; - if (this->state == State::Connected) { - this->client.send( - this->connection, - createSendRawQueryRequest(query->SerializeQuery(), messageId), - websocketpp::frame::opcode::text); - } - return messageId; + auto messageId = generateMessageId(); + messageIdToQuery[messageId] = query; + if (this->state == State::Connected) { + this->client.send( + this->connection, + createSendRawQueryRequest(query->SerializeQuery(), messageId), + websocketpp::frame::opcode::text); } - return ""; + return messageId; } void WebSocketClient::Reconnect() {