Kind of sort of working now. Need to get synchronous queries working,

even though they are generally undesireable.
This commit is contained in:
casey langen 2020-10-06 21:29:08 -07:00
parent 3f0603656b
commit 72e8895aa9
3 changed files with 63 additions and 43 deletions

View File

@ -192,61 +192,84 @@ void RemoteLibrary::ThreadProc() {
} }
} }
void RemoteLibrary::NotifyQueryCompleted(QueryContextPtr context) {
this->QueryCompleted(context->query.get());
if (context->callback) {
context->callback(context->query);
}
}
void RemoteLibrary::OnQueryCompleted(QueryContextPtr context) {
if (context) {
if (this->messageQueue) {
this->messageQueue->Post(std::make_shared<QueryCompletedMessage>(this, context));
}
else {
this->NotifyQueryCompleted(context);
}
}
}
void RemoteLibrary::OnQueryCompleted(const std::string& messageId, Query query) {
QueryContextPtr context;
{
std::unique_lock<std::recursive_mutex> lock(this->mutex);
context = queriesInFlight[messageId];
queriesInFlight.erase(messageId);
}
if (context) {
this->OnQueryCompleted(context);
}
}
void RemoteLibrary::RunQuery(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQuery(QueryContextPtr context, bool notify) {
#if 0
this->RunQueryOnLoopback(context, notify); this->RunQueryOnLoopback(context, notify);
#else
this->RunQueryOnWebSocketClient(context, notify);
#endif
} }
void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQueryOnLoopback(QueryContextPtr context, bool notify) {
if (context) { if (context) {
/* CAL TODO: eventually make the request over a websocket. right now we just /* do everything via loopback to the local library for testing. we do, however,
do everything via loopback to the local library for testing. we do, however,
go through the motions by serializing the inbound query to a string, then go through the motions by serializing the inbound query to a string, then
bouncing through the QueryRegister to create a new instance, run the query bouncing through the QueryRegister to create a new instance, run the query
locally, serialize the result, then deserialize it again to emulate the entire locally, serialize the result, then deserialize it again to emulate the entire
flow. */ flow. */
wsc.EnqueueQuery(context->query);
auto localLibrary = LibraryFactory::Instance().Default(); auto localLibrary = LibraryFactory::Instance().Default();
localLibrary->SetMessageQueue(*this->messageQueue); localLibrary->SetMessageQueue(*this->messageQueue);
auto localQuery = QueryRegistry::CreateLocalQueryFor( auto localQuery = QueryRegistry::CreateLocalQueryFor(
context->query->Name(), context->query->SerializeQuery(), localLibrary); context->query->Name(), context->query->SerializeQuery(), localLibrary);
auto onComplete = [this, notify, context, localQuery]() {
if (notify) {
if (this->messageQueue) {
this->messageQueue->Post(std::make_shared<QueryCompletedMessage>(this, context));
}
else {
this->QueryCompleted(localQuery.get());
}
}
else if (context->callback) {
context->callback(context->query);
}
};
if (!localQuery) { if (!localQuery) {
onComplete(); OnQueryCompleted(context);
return; return;
} }
localLibrary->Enqueue( localLibrary->Enqueue(
localQuery, localQuery,
ILibrary::QuerySynchronous, /* CAL TODO: make async! we have to make TrackList support async lookup first tho. */ ILibrary::QuerySynchronous, /* CAL TODO: make async! we have to make TrackList support async lookup first tho. */
[context, onComplete, localQuery](auto result) { [this, context, localQuery](auto result) {
if (localQuery->GetStatus() == IQuery::Finished) { if (localQuery->GetStatus() == IQuery::Finished) {
context->query->DeserializeResult(localQuery->SerializeResult()); context->query->DeserializeResult(localQuery->SerializeResult());
} }
onComplete(); this->OnQueryCompleted(context);
}); });
} }
} }
void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context, bool notify) { void RemoteLibrary::RunQueryOnWebSocketClient(QueryContextPtr context, bool notify) {
if (context->query) { if (context->query) {
wsc.EnqueueQuery(context->query); std::unique_lock<std::recursive_mutex> lock(this->mutex);
const std::string messageId = wsc.EnqueueQuery(context->query);
if (messageId.size()) {
queriesInFlight[messageId] = context;
}
} }
} }
@ -263,13 +286,7 @@ musik::core::IIndexer* RemoteLibrary::Indexer() {
void RemoteLibrary::ProcessMessage(musik::core::runtime::IMessage &message) { void RemoteLibrary::ProcessMessage(musik::core::runtime::IMessage &message) {
if (message.Type() == MESSAGE_QUERY_COMPLETED) { if (message.Type() == MESSAGE_QUERY_COMPLETED) {
auto context = static_cast<QueryCompletedMessage*>(&message)->GetContext(); auto context = static_cast<QueryCompletedMessage*>(&message)->GetContext();
auto query = context->query; this->NotifyQueryCompleted(context);
this->QueryCompleted(context->query.get());
if (context->callback) {
context->callback(query);
}
} }
} }
@ -284,9 +301,9 @@ void RemoteLibrary::OnClientStateChanged(Client* client, State newState, State o
} }
void RemoteLibrary::OnClientQuerySucceeded(Client* client, const std::string& messageId, Query query) { void RemoteLibrary::OnClientQuerySucceeded(Client* client, const std::string& messageId, Query query) {
this->OnQueryCompleted(messageId, query);
} }
void RemoteLibrary::OnClientQueryFailed(Client* client, const std::string& messageId, Query query, Client::ErrorCode result) { void RemoteLibrary::OnClientQueryFailed(Client* client, const std::string& messageId, Query query, Client::ErrorCode result) {
this->OnQueryCompleted(messageId, query);
} }

View File

@ -45,7 +45,7 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <unordered_map>
#include <string> #include <string>
namespace musik { namespace core { namespace library { namespace musik { namespace core { namespace library {
@ -104,6 +104,10 @@ namespace musik { namespace core { namespace library {
void RunQueryOnLoopback(QueryContextPtr context, bool notify); void RunQueryOnLoopback(QueryContextPtr context, bool notify);
void RunQueryOnWebSocketClient(QueryContextPtr context, bool notify); void RunQueryOnWebSocketClient(QueryContextPtr context, bool notify);
void OnQueryCompleted(const std::string& messageId, Query query);
void OnQueryCompleted(QueryContextPtr context);
void NotifyQueryCompleted(QueryContextPtr context);
void ThreadProc(); void ThreadProc();
QueryContextPtr GetNextQuery(); QueryContextPtr GetNextQuery();
@ -116,6 +120,8 @@ namespace musik { namespace core { namespace library {
int id; int id;
std::string name; std::string name;
std::unordered_map<std::string, QueryContextPtr> queriesInFlight;
std::thread* thread; std::thread* thread;
std::condition_variable_any queueCondition; std::condition_variable_any queueCondition;
std::recursive_mutex mutex; std::recursive_mutex mutex;

View File

@ -164,7 +164,6 @@ void WebSocketClient::Disconnect() {
std::string WebSocketClient::EnqueueQuery(Query query) { std::string WebSocketClient::EnqueueQuery(Query query) {
std::unique_lock<decltype(this->mutex)> lock(this->mutex); std::unique_lock<decltype(this->mutex)> lock(this->mutex);
if (this->state == State::Connected || this->state == State::Disconnected) {
auto messageId = generateMessageId(); auto messageId = generateMessageId();
messageIdToQuery[messageId] = query; messageIdToQuery[messageId] = query;
if (this->state == State::Connected) { if (this->state == State::Connected) {
@ -175,8 +174,6 @@ std::string WebSocketClient::EnqueueQuery(Query query) {
} }
return messageId; return messageId;
} }
return "";
}
void WebSocketClient::Reconnect() { void WebSocketClient::Reconnect() {
std::unique_lock<decltype(this->mutex)> lock(this->mutex); std::unique_lock<decltype(this->mutex)> lock(this->mutex);