diff --git a/Utilities/mutex.cpp b/Utilities/mutex.cpp index 3ac8a1688b..ae0436c188 100644 --- a/Utilities/mutex.cpp +++ b/Utilities/mutex.cpp @@ -42,6 +42,84 @@ void shared_mutex::imp_unlock_shared(u32 old) } } +void shared_mutex::imp_lock_low(u32 val) +{ + verify("shared_mutex underflow" HERE), val < c_err; + + for (int i = 0; i < 10; i++) + { + busy_wait(); + + if (try_lock_low()) + { + return; + } + } + + // Acquire writer lock and downgrade + const u32 old = m_value.fetch_add(c_one); + + if (old == 0) + { + lock_downgrade(); + return; + } + + verify("shared_mutex overflow" HERE), (old % c_sig) + c_one < c_sig; + imp_wait(); + lock_downgrade(); +} + +void shared_mutex::imp_unlock_low(u32 old) +{ + verify("shared_mutex underflow" HERE), old - 1 < c_err; + + // Check reader count, notify the writer if necessary + if ((old - 1) % c_vip == 0) + { + imp_signal(); + } +} + +void shared_mutex::imp_lock_vip(u32 val) +{ + verify("shared_mutex underflow" HERE), val < c_err; + + for (int i = 0; i < 10; i++) + { + busy_wait(); + + if (try_lock_vip()) + { + return; + } + } + + // Acquire writer lock and downgrade + const u32 old = m_value.fetch_add(c_one); + + if (old == 0) + { + lock_downgrade_to_vip(); + return; + } + + verify("shared_mutex overflow" HERE), (old % c_sig) + c_one < c_sig; + imp_wait(); + lock_downgrade_to_vip(); +} + +void shared_mutex::imp_unlock_vip(u32 old) +{ + verify("shared_mutex underflow" HERE), old - 1 < c_err; + + // Check reader count, notify the writer if necessary + if ((old - 1) % c_one / c_vip == 0) + { + imp_signal(); + } +} + void shared_mutex::imp_wait() { while (!balanced_wait_until(m_value, -1, [&](u32& value, auto...) @@ -158,3 +236,18 @@ void shared_mutex::imp_lock_unlock() imp_wait(); unlock(); } + +bool shared_mutex::downgrade_unique_vip_lock_to_low_or_unlock() +{ + return m_value.atomic_op([](u32& value) + { + if (value % c_one / c_vip == 1) + { + value -= c_vip - 1; + return true; + } + + value -= c_vip; + return false; + }); +} diff --git a/Utilities/mutex.h b/Utilities/mutex.h index c77489f5d9..787a61a9a9 100644 --- a/Utilities/mutex.h +++ b/Utilities/mutex.h @@ -12,12 +12,17 @@ class shared_mutex final c_one = 1u << 14, // Fixed-point 1.0 value (one writer, max_readers = c_one - 1) c_sig = 1u << 30, c_err = 1u << 31, + c_vip = 1u << 7, }; atomic_t m_value{}; void imp_lock_shared(u32 val); void imp_unlock_shared(u32 old); + void imp_lock_low(u32 val); + void imp_unlock_low(u32 old); + void imp_lock_vip(u32 val); + void imp_unlock_vip(u32 old); void imp_wait(); void imp_signal(); void imp_lock(u32 val); @@ -83,6 +88,64 @@ public: } } + bool try_lock_low() + { + const u32 value = m_value.load(); + + // Conditional increment + return value < c_vip - 1 && m_value.compare_and_swap_test(value, value + 1); + } + + void lock_low() + { + const u32 value = m_value.load(); + + if (UNLIKELY(value >= c_vip - 1 || !m_value.compare_and_swap_test(value, value + 1))) + { + imp_lock_low(value); + } + } + + void unlock_low() + { + // Unconditional decrement (can result in broken state) + const u32 value = m_value.fetch_sub(1); + + if (UNLIKELY(value >= c_one)) + { + imp_unlock_low(value); + } + } + + bool try_lock_vip() + { + const u32 value = m_value.load(); + + // Conditional increment + return value < c_one - 1 && (value % c_vip) == 0 && m_value.compare_and_swap_test(value, value + c_vip); + } + + void lock_vip() + { + const u32 value = m_value.load(); + + if (UNLIKELY(value >= c_one - 1 || (value % c_vip) || !m_value.compare_and_swap_test(value, value + c_vip))) + { + imp_lock_vip(value); + } + } + + void unlock_vip() + { + // Unconditional decrement (can result in broken state) + const u32 value = m_value.fetch_sub(c_vip); + + if (UNLIKELY(value >= c_one)) + { + imp_unlock_vip(value); + } + } + bool try_lock() { return m_value.compare_and_swap_test(0, c_one); @@ -151,6 +214,12 @@ public: m_value -= c_one - 1; } + void lock_downgrade_to_vip() + { + // Convert to vip lock (can result in broken state) + m_value -= c_one - c_vip; + } + // Optimized wait for lockability without locking, relaxed void lock_unlock() { @@ -171,9 +240,12 @@ public: { return m_value.load() < c_one - 1; } + + // Special purpose logic + bool downgrade_unique_vip_lock_to_low_or_unlock(); }; -// Simplified shared (reader) lock implementation. +// Simplified shared (reader) lock implementation. Mutually incompatible with low_lock and vip_lock. class reader_lock final { shared_mutex& m_mutex; @@ -211,3 +283,47 @@ public: m_upgraded ? m_mutex.unlock() : m_mutex.unlock_shared(); } }; + +// Special shared (reader) lock, mutually exclusive with vip locks. Mutually incompatible with normal shared (reader) lock. +class low_lock final +{ + shared_mutex& m_mutex; + +public: + low_lock(const low_lock&) = delete; + + low_lock& operator=(const low_lock&) = delete; + + explicit low_lock(shared_mutex& mutex) + : m_mutex(mutex) + { + m_mutex.lock_low(); + } + + ~low_lock() + { + m_mutex.unlock_low(); + } +}; + +// Special shared (reader) lock, mutually exclusive with low locks. Mutually incompatible with normal shared (reader) lock. +class vip_lock final +{ + shared_mutex& m_mutex; + +public: + vip_lock(const vip_lock&) = delete; + + vip_lock& operator=(const vip_lock&) = delete; + + explicit vip_lock(shared_mutex& mutex) + : m_mutex(mutex) + { + m_mutex.lock_vip(); + } + + ~vip_lock() + { + m_mutex.unlock_vip(); + } +}; diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 898acfdfed..298773c0bf 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -46,14 +46,8 @@ void fmt_class_string>::format(std::string& out, u64 arg) thread_local cpu_thread* g_tls_current_cpu_thread = nullptr; -// For coordination and notification -alignas(64) shared_cond g_cpu_array_lock; - -// For cpu_flag::pause bit setting/removing -alignas(64) shared_mutex g_cpu_pause_lock; - -// For cpu_flag::pause -alignas(64) atomic_t g_cpu_pause_ctr{0}; +// For synchronizing suspend_all operation +alignas(64) shared_mutex g_cpu_suspend_lock; // Semaphore for global thread array (global counter) alignas(64) atomic_t g_cpu_array_sema{0}; @@ -135,7 +129,7 @@ void cpu_thread::operator()() verify("g_cpu_array[...] -> this" HERE), g_cpu_array[array_slot].exchange(this) == nullptr; state += cpu_flag::wait; - g_cpu_array_lock.wait_all(); + g_cpu_suspend_lock.lock_unlock(); // Check thread status while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop))) @@ -171,7 +165,7 @@ void cpu_thread::operator()() verify("g_cpu_array[...] -> null" HERE), g_cpu_array[array_slot].exchange(nullptr) == this; g_cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64)); g_cpu_array_sema--; - g_cpu_array_lock.wait_all(); + g_cpu_suspend_lock.lock_unlock(); } void cpu_thread::on_abort() @@ -294,7 +288,7 @@ bool cpu_thread::check_state() noexcept else { // If only cpu_flag::pause was set, notification won't arrive - g_cpu_array_lock.wait_all(); + g_cpu_suspend_lock.lock_unlock(); } } @@ -336,25 +330,14 @@ std::string cpu_thread::dump() const } cpu_thread::suspend_all::suspend_all(cpu_thread* _this) noexcept - : m_lock(g_cpu_array_lock.try_shared_lock()) - , m_this(_this) + : m_this(_this) { - // TODO - if (!m_lock) - { - LOG_FATAL(GENERAL, "g_cpu_array_lock: too many concurrent accesses"); - Emu.Pause(); - return; - } - if (m_this) { m_this->state += cpu_flag::wait; } - g_cpu_pause_ctr++; - - reader_lock lock(g_cpu_pause_lock); + g_cpu_suspend_lock.lock_vip(); for_all_cpu([](cpu_thread* cpu) { @@ -387,33 +370,18 @@ cpu_thread::suspend_all::suspend_all(cpu_thread* _this) noexcept cpu_thread::suspend_all::~suspend_all() { // Make sure the latest thread does the cleanup and notifies others - u64 pause_ctr = 0; - - while ((pause_ctr = g_cpu_pause_ctr), !g_cpu_array_lock.wait_all(m_lock)) + if (g_cpu_suspend_lock.downgrade_unique_vip_lock_to_low_or_unlock()) { - if (pause_ctr) + for_all_cpu([&](cpu_thread* cpu) { - std::lock_guard lock(g_cpu_pause_lock); + cpu->state -= cpu_flag::pause; + }); - // Detect possible unfortunate reordering of flag clearing after suspend_all's reader lock - if (g_cpu_pause_ctr != pause_ctr) - { - continue; - } - - for_all_cpu([&](cpu_thread* cpu) - { - if (g_cpu_pause_ctr == pause_ctr) - { - cpu->state -= cpu_flag::pause; - } - }); - } - - if (g_cpu_array_lock.notify_all(m_lock)) - { - break; - } + g_cpu_suspend_lock.unlock_low(); + } + else + { + g_cpu_suspend_lock.lock_unlock(); } if (m_this) diff --git a/rpcs3/Emu/CPU/CPUThread.h b/rpcs3/Emu/CPU/CPUThread.h index 7bc69f9ac9..4d2d6e9c90 100644 --- a/rpcs3/Emu/CPU/CPUThread.h +++ b/rpcs3/Emu/CPU/CPUThread.h @@ -2,7 +2,6 @@ #include "../Utilities/Thread.h" #include "../Utilities/bit_set.h" -#include "../Utilities/cond.h" // Thread state flags enum class cpu_flag : u32 @@ -106,8 +105,6 @@ public: // Thread locker class suspend_all { - decltype(std::declval().try_shared_lock()) m_lock; - cpu_thread* m_this; public: