diff --git a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp index f6ae0158a1..93072c8cc1 100644 --- a/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp @@ -1,7 +1,5 @@ #include "stdafx.h" -#include - #include "Utilities/Thread.h" #include "util/asm.hpp" #include "util/atomic.hpp" @@ -37,6 +35,7 @@ public: msgs.insert(std::make_pair(expected_time, std::move(msg))); } + wakey.release(1); wakey.notify_one(); // TODO: Should be improved to only wake if new timeout < old timeout } @@ -71,23 +70,22 @@ public: void operator()() { + atomic_wait_timeout timeout = atomic_wait_timeout::inf; + while (thread_ctrl::state() != thread_state::aborting) { - std::unique_lock lock(data_mutex); - - if (thread_ctrl::state() == thread_state::aborting) + if (!wakey) { - break; + wakey.wait(0, timeout); } - if (msgs.size()) - wakey.wait_until(lock, msgs.begin()->first); - else - wakey.wait(lock); + wakey = 0; if (thread_ctrl::state() == thread_state::aborting) return; + std::lock_guard lock(data_mutex); + const auto now = steady_clock::now(); // Check for messages that haven't been acked std::set rtt_increased; @@ -151,13 +149,30 @@ public: msgs.insert(std::make_pair(now + rtt.rtt_time, std::move(msg))); it = msgs.erase(it); } + + if (!msgs.empty()) + { + const auto current_timepoint = steady_clock::now(); + const auto expected_timepoint = msgs.begin()->first; + if (current_timepoint > expected_timepoint) + { + wakey = 1; + } + else + { + timeout = static_cast(std::chrono::duration_cast(expected_timepoint - current_timepoint).count()); + } + } + else + { + timeout = atomic_wait_timeout::inf; + } } } tcp_timeout_monitor& operator=(thread_state) { - data_mutex.lock(); - data_mutex.unlock(); + wakey.release(1); wakey.notify_one(); return *this; } @@ -166,8 +181,8 @@ public: static constexpr auto thread_name = "Tcp Over Udp Timeout Manager Thread"sv; private: - std::condition_variable wakey; - std::mutex data_mutex; + atomic_t wakey; + shared_mutex data_mutex; // List of outgoing messages struct message { diff --git a/rpcs3/Emu/NP/signaling_handler.cpp b/rpcs3/Emu/NP/signaling_handler.cpp index f161d24137..85f5558443 100644 --- a/rpcs3/Emu/NP/signaling_handler.cpp +++ b/rpcs3/Emu/NP/signaling_handler.cpp @@ -344,17 +344,22 @@ void signaling_handler::process_incoming_messages() void signaling_handler::operator()() { + atomic_wait_timeout timeout = atomic_wait_timeout::inf; + while (thread_ctrl::state() != thread_state::aborting) { - std::unique_lock lock(data_mutex); - if (!qpackets.empty()) - wakey.wait_until(lock, qpackets.begin()->first); - else - wakey.wait(lock); + if (!wakey) + { + wakey.wait(0, timeout); + } + + wakey = 0; if (thread_ctrl::state() == thread_state::aborting) return; + std::lock_guard lock(data_mutex); + process_incoming_messages(); const auto now = steady_clock::now(); @@ -416,16 +421,36 @@ void signaling_handler::operator()() it++; reschedule_packet(si, cmd, now + delay); } + + if (!qpackets.empty()) + { + const auto current_timepoint = steady_clock::now(); + const auto expected_timepoint = qpackets.begin()->first; + if (current_timepoint > expected_timepoint) + { + wakey = 1; + } + else + { + timeout = static_cast(std::chrono::duration_cast(expected_timepoint - current_timepoint).count()); + } + } + else + { + timeout = atomic_wait_timeout::inf; + } } } void signaling_handler::wake_up() { + wakey.release(1); wakey.notify_one(); } signaling_handler& signaling_handler::operator=(thread_state) { + wakey.release(1); wakey.notify_one(); return *this; } diff --git a/rpcs3/Emu/NP/signaling_handler.h b/rpcs3/Emu/NP/signaling_handler.h index cf611ff625..3a56568177 100644 --- a/rpcs3/Emu/NP/signaling_handler.h +++ b/rpcs3/Emu/NP/signaling_handler.h @@ -5,7 +5,6 @@ #include "Emu/Cell/Modules/sceNp2.h" #include "Utilities/Thread.h" #include -#include #include #include @@ -134,8 +133,8 @@ private: void retire_all_packets(std::shared_ptr& si); void stop_sig_nl(u32 conn_id); - std::mutex data_mutex; - std::condition_variable wakey; + shared_mutex data_mutex; + atomic_t wakey; signaling_packet sig_packet{};