diff --git a/deps/discord-rpc/include/discord_rpc.h b/deps/discord-rpc/include/discord_rpc.h index 5cd4e80ae5..cc23e30828 100644 --- a/deps/discord-rpc/include/discord_rpc.h +++ b/deps/discord-rpc/include/discord_rpc.h @@ -57,7 +57,9 @@ void Discord_RunCallbacks(void); /* If you disable the lib starting its own I/O thread, * you'll need to call this from your own */ +#ifdef DISCORD_DISABLE_IO_THREAD void Discord_UpdateConnection(void); +#endif void Discord_UpdatePresence(const DiscordRichPresence* presence); void Discord_ClearPresence(void); diff --git a/deps/discord-rpc/src/discord_rpc.cpp b/deps/discord-rpc/src/discord_rpc.cpp index 66c3ae96e4..afe5676279 100644 --- a/deps/discord-rpc/src/discord_rpc.cpp +++ b/deps/discord-rpc/src/discord_rpc.cpp @@ -10,10 +10,22 @@ #include #include +#ifndef DISCORD_DISABLE_IO_THREAD +#include +#include +#endif + struct QueuedMessage { size_t length; char buffer[16384]; + + void Copy(const QueuedMessage& other) + { + length = other.length; + if (length) + memcpy(buffer, other.buffer, length); + } }; struct User @@ -68,13 +80,66 @@ static User connectedUser; static Backoff ReconnectTimeMs(500, 60 * 1000); static auto NextConnect = std::chrono::system_clock::now(); -static void update_reconnect_time(void) +#ifndef DISCORD_DISABLE_IO_THREAD +static void Discord_UpdateConnection(void); +class IoThreadHolder +{ + private: + std::atomic_bool keepRunning{true}; + std::mutex waitForIOMutex; + std::condition_variable waitForIOActivity; + std::thread ioThread; + + public: + void Start() + { + keepRunning.store(true); + ioThread = std::thread([&]() { + const std::chrono::duration maxWait{500LL}; + Discord_UpdateConnection(); + while (keepRunning.load()) { + std::unique_lock lock(waitForIOMutex); + waitForIOActivity.wait_for(lock, maxWait); + Discord_UpdateConnection(); + } + }); + } + + void Notify() { waitForIOActivity.notify_all(); } + + void Stop() + { + keepRunning.exchange(false); + Notify(); + if (ioThread.joinable()) + ioThread.join(); + } + + ~IoThreadHolder() { Stop(); } +}; +#else +class IoThreadHolder +{ + public: + void Start() {} + void Stop() {} + void Notify() {} +}; +#endif /* DISCORD_DISABLE_IO_THREAD */ + +static IoThreadHolder* IoThread{nullptr}; + +static void UpdateReconnectTime(void) { NextConnect = std::chrono::system_clock::now() + std::chrono::duration{ReconnectTimeMs.nextDelay()}; } +#ifdef DISCORD_DISABLE_IO_THREAD extern "C" void Discord_UpdateConnection(void) +#else +static void Discord_UpdateConnection(void) +#endif { if (!Connection) return; @@ -83,7 +148,7 @@ extern "C" void Discord_UpdateConnection(void) { if (std::chrono::system_clock::now() >= NextConnect) { - update_reconnect_time(); + UpdateReconnectTime(); Connection->Open(); } } @@ -203,19 +268,16 @@ extern "C" void Discord_UpdateConnection(void) if (QueuedPresence.length) { QueuedMessage local; - std::lock_guard guard(PresenceMutex); - local.length = QueuedPresence.length; - if (local.length) - memcpy(local.buffer, QueuedPresence.buffer, local.length); - QueuedPresence.length = 0; - + { + std::lock_guard guard(PresenceMutex); + local.Copy(QueuedPresence); + QueuedPresence.length = 0; + } if (!Connection->Write(local.buffer, local.length)) { /* if we fail to send, requeue */ std::lock_guard guard(PresenceMutex); - QueuedPresence.length = local.length; - if (QueuedPresence.length) - memcpy(QueuedPresence.buffer, local.buffer, QueuedPresence.length); + QueuedPresence.Copy(local); } } @@ -236,6 +298,8 @@ static bool RegisterForEvent(const char* evtName) qmessage->length = JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName); SendQueue.CommitAdd(); + if (IoThread) + IoThread->Notify(); return true; } return false; @@ -249,6 +313,8 @@ static bool DeregisterForEvent(const char* evtName) qmessage->length = JsonWriteUnsubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName); SendQueue.CommitAdd(); + if (IoThread) + IoThread->Notify(); return true; } return false; @@ -260,6 +326,10 @@ extern "C" void Discord_Initialize( int autoRegister, const char* optionalSteamId) { + IoThread = new (std::nothrow) IoThreadHolder(); + if (!IoThread) + return; + if (autoRegister) { if (optionalSteamId && optionalSteamId[0]) @@ -284,38 +354,34 @@ extern "C" void Discord_Initialize( if (Connection) return; - Connection = RpcConnection::Create(applicationId); + Connection = RpcConnection::Create(applicationId); Connection->onConnect = [](JsonDocument& readyMessage) { + Discord_UpdateHandlers(&QueuedHandlers); char *userId = NULL; char *username = NULL; char *avatar = NULL; char *discriminator = NULL; - bool in_data = false; - bool in_user = false; - - Discord_UpdateHandlers(&QueuedHandlers); + bool in_data = false, in_user = false; for (JsonReader r(readyMessage); r.NextKey();) { - if (r.depth == 1) - { - in_data = !strcmp(r.key,"data"); - in_user = false; - } - else if (r.depth == 2 && in_data) - in_user = !strcmp(r.key, "user"); - else if (r.depth == 3 && in_user) - { - if (!strcmp(r.key, "id" )) - r.NextStrDup(&userId); - else if (!strcmp(r.key, "username" )) - r.NextStrDup(&username); - else if (!strcmp(r.key, "avatar" )) - r.NextStrDup(&avatar); - else if (!strcmp(r.key, "discriminator")) - r.NextStrDup(&discriminator); - } + if (r.depth == 1) + { + in_data = !strcmp(r.key,"data"); + in_user = false; + } + else if (r.depth == 2 && in_data) + { + in_user = !strcmp(r.key, "user"); + } + else if (r.depth == 3 && in_user) + { + if (!strcmp(r.key, "id" )) r.NextStrDup(&userId); + else if (!strcmp(r.key, "username" )) r.NextStrDup(&username); + else if (!strcmp(r.key, "avatar" )) r.NextStrDup(&avatar); + else if (!strcmp(r.key, "discriminator")) r.NextStrDup(&discriminator); + } } if (userId && username) @@ -332,14 +398,10 @@ extern "C" void Discord_Initialize( WasJustConnected.exchange(true); ReconnectTimeMs.reset(); - if (userId) - free(userId); - if (username) - free(username); - if (avatar) - free(avatar); - if (discriminator) - free(discriminator); + if (userId ) free(userId ); + if (username ) free(username ); + if (avatar ) free(avatar ); + if (discriminator) free(discriminator); }; Connection->onDisconnect = [](int err, const char* message) { @@ -350,8 +412,10 @@ extern "C" void Discord_Initialize( Handlers = {}; } WasJustDisconnected.exchange(true); - update_reconnect_time(); + UpdateReconnectTime(); }; + + IoThread->Start(); } extern "C" void Discord_Shutdown(void) @@ -360,17 +424,26 @@ extern "C" void Discord_Shutdown(void) return; Connection->onConnect = nullptr; Connection->onDisconnect = nullptr; - Handlers = {}; + Handlers = {}; + if (IoThread) + { + IoThread->Stop(); + delete IoThread; + IoThread = nullptr; + } RpcConnection::Destroy(Connection); } extern "C" void Discord_UpdatePresence(const DiscordRichPresence* presence) { - std::lock_guard guard(PresenceMutex); - QueuedPresence.length = JsonWriteRichPresenceObj( - QueuedPresence.buffer, sizeof(QueuedPresence.buffer), - Nonce++, Pid, presence); + { + std::lock_guard guard(PresenceMutex); + QueuedPresence.length = JsonWriteRichPresenceObj( + QueuedPresence.buffer, sizeof(QueuedPresence.buffer), Nonce++, Pid, presence); + } + if (IoThread) + IoThread->Notify(); } extern "C" void Discord_ClearPresence(void) @@ -378,8 +451,7 @@ extern "C" void Discord_ClearPresence(void) Discord_UpdatePresence(nullptr); } -extern "C" void Discord_Respond(const char* userId, - /* DISCORD_REPLY_ */ int reply) +extern "C" void Discord_Respond(const char* userId, /* DISCORD_REPLY_ */ int reply) { /* if we are not connected, let's not batch up stale messages for later */ if (!Connection || !Connection->IsOpen()) @@ -390,6 +462,8 @@ extern "C" void Discord_Respond(const char* userId, qmessage->length = JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++); SendQueue.CommitAdd(); + if (IoThread) + IoThread->Notify(); } } @@ -456,14 +530,16 @@ extern "C" void Discord_RunCallbacks(void) */ while (JoinAskQueue.HavePendingSends()) { - auto req = JoinAskQueue.GetNextSendMessage(); - std::lock_guard guard(HandlerMutex); - if (Handlers.joinRequest) - { - DiscordUser du{req->userId, req->username, req->discriminator, req->avatar}; - Handlers.joinRequest(&du); - } - JoinAskQueue.CommitSend(); + auto req = JoinAskQueue.GetNextSendMessage(); + { + std::lock_guard guard(HandlerMutex); + if (Handlers.joinRequest) + { + DiscordUser du{req->userId, req->username, req->discriminator, req->avatar}; + Handlers.joinRequest(&du); + } + } + JoinAskQueue.CommitSend(); } if (!isConnected) diff --git a/retroarch.c b/retroarch.c index bce3aba38d..527814e28a 100644 --- a/retroarch.c +++ b/retroarch.c @@ -6708,7 +6708,9 @@ void discord_update(enum discord_presence presence) #endif Discord_UpdatePresence(&discord_st->presence); +#ifdef DISCORD_DISABLE_IO_THREAD Discord_UpdateConnection(); +#endif discord_st->status = presence; } @@ -6730,7 +6732,9 @@ static void discord_init( handlers.joinRequest = handle_discord_join_request; Discord_Initialize(discord_app_id, &handlers, 0, NULL); +#ifdef DISCORD_DISABLE_IO_THREAD Discord_UpdateConnection(); +#endif #ifdef _WIN32 fill_pathname_application_path(full_path, sizeof(full_path)); @@ -6748,7 +6752,9 @@ static void discord_init( #endif RARCH_LOG("[DISCORD]: Registering startup command: %s\n", command); Discord_Register(discord_app_id, command); +#ifdef DISCORD_DISABLE_IO_THREAD Discord_UpdateConnection(); +#endif discord_st->ready = true; } #endif @@ -36630,7 +36636,9 @@ bool retroarch_main_quit(void) if (discord_st->ready) { Discord_ClearPresence(); +#ifdef DISCORD_DISABLE_IO_THREAD Discord_UpdateConnection(); +#endif Discord_Shutdown(); discord_st->ready = false; } @@ -38118,7 +38126,9 @@ int runloop_iterate(void) if (discord_is_inited) { Discord_RunCallbacks(); +#ifdef DISCORD_DISABLE_IO_THREAD Discord_UpdateConnection(); +#endif } #endif