Go back to Discord RPC I/O Thread codepath for now - the non-I/O

thread codepath seems to be unreliable for now
This commit is contained in:
twinaphex 2021-04-13 16:44:46 +02:00
parent 5658ea7047
commit 65c01b35ec
3 changed files with 146 additions and 58 deletions

View File

@ -57,7 +57,9 @@ void Discord_RunCallbacks(void);
/* If you disable the lib starting its own I/O thread, /* If you disable the lib starting its own I/O thread,
* you'll need to call this from your own */ * you'll need to call this from your own */
#ifdef DISCORD_DISABLE_IO_THREAD
void Discord_UpdateConnection(void); void Discord_UpdateConnection(void);
#endif
void Discord_UpdatePresence(const DiscordRichPresence* presence); void Discord_UpdatePresence(const DiscordRichPresence* presence);
void Discord_ClearPresence(void); void Discord_ClearPresence(void);

View File

@ -10,10 +10,22 @@
#include <chrono> #include <chrono>
#include <mutex> #include <mutex>
#ifndef DISCORD_DISABLE_IO_THREAD
#include <condition_variable>
#include <thread>
#endif
struct QueuedMessage struct QueuedMessage
{ {
size_t length; size_t length;
char buffer[16384]; char buffer[16384];
void Copy(const QueuedMessage& other)
{
length = other.length;
if (length)
memcpy(buffer, other.buffer, length);
}
}; };
struct User struct User
@ -68,13 +80,66 @@ static User connectedUser;
static Backoff ReconnectTimeMs(500, 60 * 1000); static Backoff ReconnectTimeMs(500, 60 * 1000);
static auto NextConnect = std::chrono::system_clock::now(); static auto NextConnect = std::chrono::system_clock::now();
static void update_reconnect_time(void) #ifndef DISCORD_DISABLE_IO_THREAD
static void Discord_UpdateConnection(void);
class IoThreadHolder
{
private:
std::atomic_bool keepRunning{true};
std::mutex waitForIOMutex;
std::condition_variable waitForIOActivity;
std::thread ioThread;
public:
void Start()
{
keepRunning.store(true);
ioThread = std::thread([&]() {
const std::chrono::duration<int64_t, std::milli> maxWait{500LL};
Discord_UpdateConnection();
while (keepRunning.load()) {
std::unique_lock<std::mutex> lock(waitForIOMutex);
waitForIOActivity.wait_for(lock, maxWait);
Discord_UpdateConnection();
}
});
}
void Notify() { waitForIOActivity.notify_all(); }
void Stop()
{
keepRunning.exchange(false);
Notify();
if (ioThread.joinable())
ioThread.join();
}
~IoThreadHolder() { Stop(); }
};
#else
class IoThreadHolder
{
public:
void Start() {}
void Stop() {}
void Notify() {}
};
#endif /* DISCORD_DISABLE_IO_THREAD */
static IoThreadHolder* IoThread{nullptr};
static void UpdateReconnectTime(void)
{ {
NextConnect = std::chrono::system_clock::now() + NextConnect = std::chrono::system_clock::now() +
std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()}; std::chrono::duration<int64_t, std::milli>{ReconnectTimeMs.nextDelay()};
} }
#ifdef DISCORD_DISABLE_IO_THREAD
extern "C" void Discord_UpdateConnection(void) extern "C" void Discord_UpdateConnection(void)
#else
static void Discord_UpdateConnection(void)
#endif
{ {
if (!Connection) if (!Connection)
return; return;
@ -83,7 +148,7 @@ extern "C" void Discord_UpdateConnection(void)
{ {
if (std::chrono::system_clock::now() >= NextConnect) if (std::chrono::system_clock::now() >= NextConnect)
{ {
update_reconnect_time(); UpdateReconnectTime();
Connection->Open(); Connection->Open();
} }
} }
@ -203,19 +268,16 @@ extern "C" void Discord_UpdateConnection(void)
if (QueuedPresence.length) if (QueuedPresence.length)
{ {
QueuedMessage local; QueuedMessage local;
std::lock_guard<std::mutex> guard(PresenceMutex); {
local.length = QueuedPresence.length; std::lock_guard<std::mutex> guard(PresenceMutex);
if (local.length) local.Copy(QueuedPresence);
memcpy(local.buffer, QueuedPresence.buffer, local.length); QueuedPresence.length = 0;
QueuedPresence.length = 0; }
if (!Connection->Write(local.buffer, local.length)) if (!Connection->Write(local.buffer, local.length))
{ {
/* if we fail to send, requeue */ /* if we fail to send, requeue */
std::lock_guard<std::mutex> guard(PresenceMutex); std::lock_guard<std::mutex> guard(PresenceMutex);
QueuedPresence.length = local.length; QueuedPresence.Copy(local);
if (QueuedPresence.length)
memcpy(QueuedPresence.buffer, local.buffer, QueuedPresence.length);
} }
} }
@ -236,6 +298,8 @@ static bool RegisterForEvent(const char* evtName)
qmessage->length = qmessage->length =
JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName); JsonWriteSubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
SendQueue.CommitAdd(); SendQueue.CommitAdd();
if (IoThread)
IoThread->Notify();
return true; return true;
} }
return false; return false;
@ -249,6 +313,8 @@ static bool DeregisterForEvent(const char* evtName)
qmessage->length = qmessage->length =
JsonWriteUnsubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName); JsonWriteUnsubscribeCommand(qmessage->buffer, sizeof(qmessage->buffer), Nonce++, evtName);
SendQueue.CommitAdd(); SendQueue.CommitAdd();
if (IoThread)
IoThread->Notify();
return true; return true;
} }
return false; return false;
@ -260,6 +326,10 @@ extern "C" void Discord_Initialize(
int autoRegister, int autoRegister,
const char* optionalSteamId) const char* optionalSteamId)
{ {
IoThread = new (std::nothrow) IoThreadHolder();
if (!IoThread)
return;
if (autoRegister) if (autoRegister)
{ {
if (optionalSteamId && optionalSteamId[0]) if (optionalSteamId && optionalSteamId[0])
@ -284,38 +354,34 @@ extern "C" void Discord_Initialize(
if (Connection) if (Connection)
return; return;
Connection = RpcConnection::Create(applicationId); Connection = RpcConnection::Create(applicationId);
Connection->onConnect = [](JsonDocument& readyMessage) Connection->onConnect = [](JsonDocument& readyMessage)
{ {
Discord_UpdateHandlers(&QueuedHandlers);
char *userId = NULL; char *userId = NULL;
char *username = NULL; char *username = NULL;
char *avatar = NULL; char *avatar = NULL;
char *discriminator = NULL; char *discriminator = NULL;
bool in_data = false;
bool in_user = false;
Discord_UpdateHandlers(&QueuedHandlers);
bool in_data = false, in_user = false;
for (JsonReader r(readyMessage); r.NextKey();) for (JsonReader r(readyMessage); r.NextKey();)
{ {
if (r.depth == 1) if (r.depth == 1)
{ {
in_data = !strcmp(r.key,"data"); in_data = !strcmp(r.key,"data");
in_user = false; in_user = false;
} }
else if (r.depth == 2 && in_data) else if (r.depth == 2 && in_data)
in_user = !strcmp(r.key, "user"); {
else if (r.depth == 3 && in_user) in_user = !strcmp(r.key, "user");
{ }
if (!strcmp(r.key, "id" )) else if (r.depth == 3 && in_user)
r.NextStrDup(&userId); {
else if (!strcmp(r.key, "username" )) if (!strcmp(r.key, "id" )) r.NextStrDup(&userId);
r.NextStrDup(&username); else if (!strcmp(r.key, "username" )) r.NextStrDup(&username);
else if (!strcmp(r.key, "avatar" )) else if (!strcmp(r.key, "avatar" )) r.NextStrDup(&avatar);
r.NextStrDup(&avatar); else if (!strcmp(r.key, "discriminator")) r.NextStrDup(&discriminator);
else if (!strcmp(r.key, "discriminator")) }
r.NextStrDup(&discriminator);
}
} }
if (userId && username) if (userId && username)
@ -332,14 +398,10 @@ extern "C" void Discord_Initialize(
WasJustConnected.exchange(true); WasJustConnected.exchange(true);
ReconnectTimeMs.reset(); ReconnectTimeMs.reset();
if (userId) if (userId ) free(userId );
free(userId); if (username ) free(username );
if (username) if (avatar ) free(avatar );
free(username); if (discriminator) free(discriminator);
if (avatar)
free(avatar);
if (discriminator)
free(discriminator);
}; };
Connection->onDisconnect = [](int err, const char* message) Connection->onDisconnect = [](int err, const char* message)
{ {
@ -350,8 +412,10 @@ extern "C" void Discord_Initialize(
Handlers = {}; Handlers = {};
} }
WasJustDisconnected.exchange(true); WasJustDisconnected.exchange(true);
update_reconnect_time(); UpdateReconnectTime();
}; };
IoThread->Start();
} }
extern "C" void Discord_Shutdown(void) extern "C" void Discord_Shutdown(void)
@ -360,17 +424,26 @@ extern "C" void Discord_Shutdown(void)
return; return;
Connection->onConnect = nullptr; Connection->onConnect = nullptr;
Connection->onDisconnect = nullptr; Connection->onDisconnect = nullptr;
Handlers = {}; Handlers = {};
if (IoThread)
{
IoThread->Stop();
delete IoThread;
IoThread = nullptr;
}
RpcConnection::Destroy(Connection); RpcConnection::Destroy(Connection);
} }
extern "C" void Discord_UpdatePresence(const DiscordRichPresence* presence) extern "C" void Discord_UpdatePresence(const DiscordRichPresence* presence)
{ {
std::lock_guard<std::mutex> guard(PresenceMutex); {
QueuedPresence.length = JsonWriteRichPresenceObj( std::lock_guard<std::mutex> guard(PresenceMutex);
QueuedPresence.buffer, sizeof(QueuedPresence.buffer), QueuedPresence.length = JsonWriteRichPresenceObj(
Nonce++, Pid, presence); QueuedPresence.buffer, sizeof(QueuedPresence.buffer), Nonce++, Pid, presence);
}
if (IoThread)
IoThread->Notify();
} }
extern "C" void Discord_ClearPresence(void) extern "C" void Discord_ClearPresence(void)
@ -378,8 +451,7 @@ extern "C" void Discord_ClearPresence(void)
Discord_UpdatePresence(nullptr); Discord_UpdatePresence(nullptr);
} }
extern "C" void Discord_Respond(const char* userId, extern "C" void Discord_Respond(const char* userId, /* DISCORD_REPLY_ */ int reply)
/* DISCORD_REPLY_ */ int reply)
{ {
/* if we are not connected, let's not batch up stale messages for later */ /* if we are not connected, let's not batch up stale messages for later */
if (!Connection || !Connection->IsOpen()) if (!Connection || !Connection->IsOpen())
@ -390,6 +462,8 @@ extern "C" void Discord_Respond(const char* userId,
qmessage->length = qmessage->length =
JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++); JsonWriteJoinReply(qmessage->buffer, sizeof(qmessage->buffer), userId, reply, Nonce++);
SendQueue.CommitAdd(); SendQueue.CommitAdd();
if (IoThread)
IoThread->Notify();
} }
} }
@ -456,14 +530,16 @@ extern "C" void Discord_RunCallbacks(void)
*/ */
while (JoinAskQueue.HavePendingSends()) while (JoinAskQueue.HavePendingSends())
{ {
auto req = JoinAskQueue.GetNextSendMessage(); auto req = JoinAskQueue.GetNextSendMessage();
std::lock_guard<std::mutex> guard(HandlerMutex); {
if (Handlers.joinRequest) std::lock_guard<std::mutex> guard(HandlerMutex);
{ if (Handlers.joinRequest)
DiscordUser du{req->userId, req->username, req->discriminator, req->avatar}; {
Handlers.joinRequest(&du); DiscordUser du{req->userId, req->username, req->discriminator, req->avatar};
} Handlers.joinRequest(&du);
JoinAskQueue.CommitSend(); }
}
JoinAskQueue.CommitSend();
} }
if (!isConnected) if (!isConnected)

View File

@ -6708,7 +6708,9 @@ void discord_update(enum discord_presence presence)
#endif #endif
Discord_UpdatePresence(&discord_st->presence); Discord_UpdatePresence(&discord_st->presence);
#ifdef DISCORD_DISABLE_IO_THREAD
Discord_UpdateConnection(); Discord_UpdateConnection();
#endif
discord_st->status = presence; discord_st->status = presence;
} }
@ -6730,7 +6732,9 @@ static void discord_init(
handlers.joinRequest = handle_discord_join_request; handlers.joinRequest = handle_discord_join_request;
Discord_Initialize(discord_app_id, &handlers, 0, NULL); Discord_Initialize(discord_app_id, &handlers, 0, NULL);
#ifdef DISCORD_DISABLE_IO_THREAD
Discord_UpdateConnection(); Discord_UpdateConnection();
#endif
#ifdef _WIN32 #ifdef _WIN32
fill_pathname_application_path(full_path, sizeof(full_path)); fill_pathname_application_path(full_path, sizeof(full_path));
@ -6748,7 +6752,9 @@ static void discord_init(
#endif #endif
RARCH_LOG("[DISCORD]: Registering startup command: %s\n", command); RARCH_LOG("[DISCORD]: Registering startup command: %s\n", command);
Discord_Register(discord_app_id, command); Discord_Register(discord_app_id, command);
#ifdef DISCORD_DISABLE_IO_THREAD
Discord_UpdateConnection(); Discord_UpdateConnection();
#endif
discord_st->ready = true; discord_st->ready = true;
} }
#endif #endif
@ -36630,7 +36636,9 @@ bool retroarch_main_quit(void)
if (discord_st->ready) if (discord_st->ready)
{ {
Discord_ClearPresence(); Discord_ClearPresence();
#ifdef DISCORD_DISABLE_IO_THREAD
Discord_UpdateConnection(); Discord_UpdateConnection();
#endif
Discord_Shutdown(); Discord_Shutdown();
discord_st->ready = false; discord_st->ready = false;
} }
@ -38118,7 +38126,9 @@ int runloop_iterate(void)
if (discord_is_inited) if (discord_is_inited)
{ {
Discord_RunCallbacks(); Discord_RunCallbacks();
#ifdef DISCORD_DISABLE_IO_THREAD
Discord_UpdateConnection(); Discord_UpdateConnection();
#endif
} }
#endif #endif