Replace condition_variable with atomic_t

This commit is contained in:
RipleyTom 2023-12-20 10:47:17 +01:00 committed by Elad Ashkenazi
parent 2eae0a9d3a
commit dbd1f27862
3 changed files with 61 additions and 22 deletions

View File

@ -1,7 +1,5 @@
#include "stdafx.h"
#include <condition_variable>
#include "Utilities/Thread.h"
#include "util/asm.hpp"
#include "util/atomic.hpp"
@ -37,6 +35,7 @@ public:
msgs.insert(std::make_pair(expected_time, std::move(msg)));
}
wakey.release(1);
wakey.notify_one(); // TODO: Should be improved to only wake if new timeout < old timeout
}
@ -71,23 +70,22 @@ public:
void operator()()
{
atomic_wait_timeout timeout = atomic_wait_timeout::inf;
while (thread_ctrl::state() != thread_state::aborting)
{
std::unique_lock<std::mutex> lock(data_mutex);
if (thread_ctrl::state() == thread_state::aborting)
if (!wakey)
{
break;
wakey.wait(0, timeout);
}
if (msgs.size())
wakey.wait_until(lock, msgs.begin()->first);
else
wakey.wait(lock);
wakey = 0;
if (thread_ctrl::state() == thread_state::aborting)
return;
std::lock_guard lock(data_mutex);
const auto now = steady_clock::now();
// Check for messages that haven't been acked
std::set<s32> rtt_increased;
@ -151,13 +149,30 @@ public:
msgs.insert(std::make_pair(now + rtt.rtt_time, std::move(msg)));
it = msgs.erase(it);
}
if (!msgs.empty())
{
const auto current_timepoint = steady_clock::now();
const auto expected_timepoint = msgs.begin()->first;
if (current_timepoint > expected_timepoint)
{
wakey = 1;
}
else
{
timeout = static_cast<atomic_wait_timeout>(std::chrono::duration_cast<std::chrono::nanoseconds>(expected_timepoint - current_timepoint).count());
}
}
else
{
timeout = atomic_wait_timeout::inf;
}
}
}
tcp_timeout_monitor& operator=(thread_state)
{
data_mutex.lock();
data_mutex.unlock();
wakey.release(1);
wakey.notify_one();
return *this;
}
@ -166,8 +181,8 @@ public:
static constexpr auto thread_name = "Tcp Over Udp Timeout Manager Thread"sv;
private:
std::condition_variable wakey;
std::mutex data_mutex;
atomic_t<u32> wakey;
shared_mutex data_mutex;
// List of outgoing messages
struct message
{

View File

@ -344,17 +344,22 @@ void signaling_handler::process_incoming_messages()
void signaling_handler::operator()()
{
atomic_wait_timeout timeout = atomic_wait_timeout::inf;
while (thread_ctrl::state() != thread_state::aborting)
{
std::unique_lock<std::mutex> lock(data_mutex);
if (!qpackets.empty())
wakey.wait_until(lock, qpackets.begin()->first);
else
wakey.wait(lock);
if (!wakey)
{
wakey.wait(0, timeout);
}
wakey = 0;
if (thread_ctrl::state() == thread_state::aborting)
return;
std::lock_guard lock(data_mutex);
process_incoming_messages();
const auto now = steady_clock::now();
@ -416,16 +421,36 @@ void signaling_handler::operator()()
it++;
reschedule_packet(si, cmd, now + delay);
}
if (!qpackets.empty())
{
const auto current_timepoint = steady_clock::now();
const auto expected_timepoint = qpackets.begin()->first;
if (current_timepoint > expected_timepoint)
{
wakey = 1;
}
else
{
timeout = static_cast<atomic_wait_timeout>(std::chrono::duration_cast<std::chrono::nanoseconds>(expected_timepoint - current_timepoint).count());
}
}
else
{
timeout = atomic_wait_timeout::inf;
}
}
}
void signaling_handler::wake_up()
{
wakey.release(1);
wakey.notify_one();
}
signaling_handler& signaling_handler::operator=(thread_state)
{
wakey.release(1);
wakey.notify_one();
return *this;
}

View File

@ -5,7 +5,6 @@
#include "Emu/Cell/Modules/sceNp2.h"
#include "Utilities/Thread.h"
#include <unordered_map>
#include <condition_variable>
#include <chrono>
#include <optional>
@ -134,8 +133,8 @@ private:
void retire_all_packets(std::shared_ptr<signaling_info>& si);
void stop_sig_nl(u32 conn_id);
std::mutex data_mutex;
std::condition_variable wakey;
shared_mutex data_mutex;
atomic_t<u32> wakey;
signaling_packet sig_packet{};