diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index fe6fe0c1e4..e2224deefa 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -1778,11 +1778,13 @@ void thread_base::finalize() noexcept --g_thread_count; } -bool thread_ctrl::_wait_for(u64 usec) +void thread_ctrl::_wait_for(u64 usec) { auto _this = g_tls_this_thread; - do + std::unique_lock lock(_this->m_mutex, std::defer_lock); + + while (true) { // Mutex is unlocked at the start and after the waiting if (u32 sig = _this->m_signal.load()) @@ -1790,17 +1792,20 @@ bool thread_ctrl::_wait_for(u64 usec) if (sig & 1) { _this->m_signal &= ~1; - return true; + return; } } if (usec == 0) { // No timeout: return immediately - return false; + return; } - _this->m_mutex.lock(); + if (!lock) + { + lock.lock(); + } // Double-check the value if (u32 sig = _this->m_signal.load()) @@ -1808,15 +1813,17 @@ bool thread_ctrl::_wait_for(u64 usec) if (sig & 1) { _this->m_signal &= ~1; - _this->m_mutex.unlock(); - return true; + return; } } - } - while (_this->m_cond.wait_unlock(std::exchange(usec, usec > cond_variable::max_timeout ? -1 : 0), _this->m_mutex)); - // Timeout - return false; + _this->m_cond.wait_unlock(usec, lock); + + if (usec < cond_variable::max_timeout) + { + usec = 0; + } + } } thread_base::thread_base(std::string_view name) diff --git a/Utilities/Thread.h b/Utilities/Thread.h index d37a13a464..c8ea405388 100644 --- a/Utilities/Thread.h +++ b/Utilities/Thread.h @@ -185,7 +185,7 @@ class thread_ctrl final static atomic_t g_native_core_layout; // Internal waiting function, may throw. Infinite value is -1. - static bool _wait_for(u64 usec); + static void _wait_for(u64 usec); friend class thread_base; @@ -235,9 +235,9 @@ public: } // Wait once with timeout. May spuriously return false. - static inline bool wait_for(u64 usec) + static inline void wait_for(u64 usec) { - return _wait_for(usec); + _wait_for(usec); } // Wait. diff --git a/Utilities/cond.cpp b/Utilities/cond.cpp index 64c0515323..fba6eee3ea 100644 --- a/Utilities/cond.cpp +++ b/Utilities/cond.cpp @@ -8,50 +8,57 @@ #include #endif -bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept +// use constants, increase signal space + +void cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept { - verify("cond_variable overflow" HERE), (_old & 0xffff) != 0xffff; // Very unlikely: it requires 65535 distinct threads to wait simultaneously + // Not supposed to fail + verify(HERE), _old; - return balanced_wait_until(m_value, _timeout, [&](u32& value, auto... ret) -> int + // Wait with timeout + m_value.wait(_old, atomic_wait_timeout{_timeout > max_timeout ? UINT64_MAX : _timeout * 1000}); + + // Cleanup + m_value.atomic_op([](u32& value) { - if (value >> 16) - { - // Success - value -= 0x10001; - return +1; - } + value -= c_waiter_mask & -c_waiter_mask; - if constexpr (sizeof...(ret)) + if ((value & c_waiter_mask) == 0) { - // Retire - value -= 1; - return -1; + // Last waiter removed, clean signals + value = 0; } - - return 0; }); - -#ifdef _WIN32 - if (_old >= 0x10000 && !OptWaitOnAddress && m_value) - { - // Workaround possibly stolen signal - imp_wake(1); - } -#endif } void cond_variable::imp_wake(u32 _count) noexcept { - // TODO (notify_one) - balanced_awaken(m_value, m_value.atomic_op([&](u32& value) -> u32 + const auto [_old, ok] = m_value.fetch_op([](u32& value) { - // Subtract already signaled number from total amount of waiters - const u32 can_sig = (value & 0xffff) - (value >> 16); - const u32 num_sig = std::min(can_sig, _count); + if (!value || (value & c_signal_mask) == c_signal_mask) + { + return false; + } - value += num_sig << 16; - return num_sig; - })); + // Add signal + value += c_signal_mask & -c_signal_mask; + return true; + }); + + if (!ok || !_count) + { + return; + } + + if (_count > 1 || ((_old + (c_signal_mask & -c_signal_mask)) & c_signal_mask) == c_signal_mask) + { + // Resort to notify_all if signal count reached max + m_value.notify_all(); + } + else + { + m_value.notify_one(); + } } bool shared_cond::imp_wait(u32 slot, u64 _timeout) noexcept diff --git a/Utilities/cond.h b/Utilities/cond.h index 01a23d83eb..1e6d2f7b62 100644 --- a/Utilities/cond.h +++ b/Utilities/cond.h @@ -11,9 +11,31 @@ class cond_variable // Internal waiter counter atomic_t m_value{0}; + enum : u32 + { + c_waiter_mask = 0x1fff, + c_signal_mask = 0xffffffff & ~c_waiter_mask, + }; + protected: + // Increment waiter count + u32 add_waiter() noexcept + { + return m_value.atomic_op([](u32& value) -> u32 + { + if ((value & c_signal_mask) == c_signal_mask || (value & c_waiter_mask) == c_waiter_mask) + { + // Signal or waiter overflow, return immediately + return 0; + } + + value += c_waiter_mask & -c_waiter_mask; + return value; + }); + } + // Internal waiting function - bool imp_wait(u32 _old, u64 _timeout) noexcept; + void imp_wait(u32 _old, u64 _timeout) noexcept; // Try to notify up to _count threads void imp_wake(u32 _count) noexcept; @@ -23,22 +45,33 @@ public: // Intrusive wait algorithm for lockable objects template - bool wait(T& object, u64 usec_timeout = -1) + void wait(T& object, u64 usec_timeout = -1) noexcept { - const u32 _old = m_value.fetch_add(1); // Increment waiter counter + const u32 _old = add_waiter(); + + if (!_old) + { + return; + } + object.unlock(); - const bool res = imp_wait(_old, usec_timeout); + imp_wait(_old, usec_timeout); object.lock(); - return res; } // Unlock all specified objects but don't lock them again template - bool wait_unlock(u64 usec_timeout, Locks&&... locks) + void wait_unlock(u64 usec_timeout, Locks&&... locks) { - const u32 _old = m_value.fetch_add(1); // Increment waiter counter + const u32 _old = add_waiter(); (..., std::forward(locks).unlock()); - return imp_wait(_old, usec_timeout); + + if (!_old) + { + return; + } + + imp_wait(_old, usec_timeout); } // Wake one thread @@ -55,11 +88,11 @@ public: { if (m_value) { - imp_wake(65535); + imp_wake(-1); } } - static constexpr u64 max_timeout = u64{UINT32_MAX} / 1000 * 1000000; + static constexpr u64 max_timeout = UINT64_MAX / 1000; }; // Condition variable fused with a pseudo-mutex supporting only reader locks (up to 32 readers). diff --git a/rpcs3/Emu/Cell/lv2/sys_sync.h b/rpcs3/Emu/Cell/lv2/sys_sync.h index 9a68261f47..af2acefe3c 100644 --- a/rpcs3/Emu/Cell/lv2/sys_sync.h +++ b/rpcs3/Emu/Cell/lv2/sys_sync.h @@ -236,10 +236,10 @@ public: static_assert(UINT64_MAX / cond_variable::max_timeout >= g_cfg.core.clocks_scale.max, "timeout may overflow during scaling"); // Clamp to max timeout accepted - if (usec > cond_variable::max_timeout) usec = cond_variable::max_timeout; + const u64 max_usec = cond_variable::max_timeout * 100 / g_cfg.core.clocks_scale.max; // Now scale the result - usec = (usec * g_cfg.core.clocks_scale) / 100; + usec = (std::min(usec, max_usec) * g_cfg.core.clocks_scale) / 100; #ifdef __linux__ // TODO: Confirm whether Apple or any BSD can benefit from this as well @@ -271,7 +271,7 @@ public: // Do not wait for the last quantum to avoid loss of accuracy thread_ctrl::wait_for(remaining - ((remaining % host_min_quantum) + host_min_quantum)); #else - // Wait on multiple of min quantum for large durations to avoid overloading low thread cpus + // Wait on multiple of min quantum for large durations to avoid overloading low thread cpus thread_ctrl::wait_for(remaining - (remaining % host_min_quantum)); #endif }