diff --git a/Utilities/Atomic.h b/Utilities/Atomic.h index d40e9e6f1c..7404d08c6c 100644 --- a/Utilities/Atomic.h +++ b/Utilities/Atomic.h @@ -300,21 +300,24 @@ struct atomic_storage : atomic_storage static inline bool bts(T& dest, uint bit) { bool result; - __asm__("lock btsw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ushort _bit = (ushort)bit; + __asm__("lock btsw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } static inline bool btr(T& dest, uint bit) { bool result; - __asm__("lock btrw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ushort _bit = (ushort)bit; + __asm__("lock btrw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } static inline bool btc(T& dest, uint bit) { bool result; - __asm__("lock btcw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ushort _bit = (ushort)bit; + __asm__("lock btcw %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } #endif @@ -498,21 +501,24 @@ struct atomic_storage : atomic_storage static inline bool bts(T& dest, uint bit) { bool result; - __asm__("lock btsq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ullong _bit = bit; + __asm__("lock btsq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } static inline bool btr(T& dest, uint bit) { bool result; - __asm__("lock btrq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ullong _bit = bit; + __asm__("lock btrq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } static inline bool btc(T& dest, uint bit) { bool result; - __asm__("lock btcq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (bit) : "cc"); + ullong _bit = bit; + __asm__("lock btcq %2, %0\n" "setc %1" : "+m" (dest), "=r" (result) : "Ir" (_bit) : "cc"); return result; } #endif diff --git a/Utilities/File.cpp b/Utilities/File.cpp index 479aefc1a2..f89f570cf6 100644 --- a/Utilities/File.cpp +++ b/Utilities/File.cpp @@ -1,6 +1,6 @@ #include "File.h" +#include "mutex.h" #include "StrFmt.h" -#include "SharedMutex.h" #include "BEType.h" #include "Crypto/sha1.h" diff --git a/Utilities/Semaphore.cpp b/Utilities/Semaphore.cpp deleted file mode 100644 index 76fab20e08..0000000000 --- a/Utilities/Semaphore.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#include "Utilities/Semaphore.h" - -#include -#include - -struct benaphore::internal -{ - std::mutex mutex; - - std::size_t acq_order{}; - std::size_t rel_order{}; - - std::condition_variable cond; -}; - -void benaphore::wait_hard() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - // Notify non-zero waiter queue size - if (m_value.exchange(-1) == 1) - { - // Return immediately (acquired) - m_value = 0; - return; - } - - // Remember the order - const std::size_t order = ++m_data->acq_order; - - // Wait for the appropriate rel_order (TODO) - while (m_data->rel_order < order) - { - m_data->cond.wait(lock); - } - - if (order == m_data->acq_order && m_data->acq_order == m_data->rel_order) - { - // Cleaup - m_data->acq_order = 0; - m_data->rel_order = 0; - m_value.compare_and_swap(-1, 0); - } -} - -void benaphore::post_hard() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - if (m_value.compare_and_swap(0, 1) != -1) - { - // Do nothing (released) - return; - } - - if (m_data->acq_order == m_data->rel_order) - { - m_value = 1; - return; - } - - // Awake one thread - m_data->rel_order += 1; - - // Unlock and notify - lock.unlock(); - m_data->cond.notify_one(); -} - -void benaphore::initialize_once() -{ - if (UNLIKELY(!m_data)) - { - auto ptr = new benaphore::internal; - - if (!m_data.compare_and_swap_test(nullptr, ptr)) - { - delete ptr; - } - } -} - -benaphore::~benaphore() -{ - delete m_data; -} diff --git a/Utilities/Semaphore.h b/Utilities/Semaphore.h deleted file mode 100644 index cd8bdfcd41..0000000000 --- a/Utilities/Semaphore.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include "types.h" -#include "Atomic.h" - -// Binary semaphore -class benaphore -{ - struct internal; - - // Reserved value (-1) enforces *_hard() calls - atomic_t m_value{}; - - atomic_t m_data{}; - - void wait_hard(); - void post_hard(); - -public: - constexpr benaphore() = default; - - ~benaphore(); - - // Initialize internal data - void initialize_once(); - - void wait() - { - if (UNLIKELY(!m_value.compare_and_swap_test(1, 0))) - { - wait_hard(); - } - } - - bool try_wait() - { - return m_value.compare_and_swap_test(1, 0); - } - - void post() - { - if (UNLIKELY(!m_value.compare_and_swap_test(0, 1))) - { - post_hard(); - } - } -}; diff --git a/Utilities/SharedMutex.cpp b/Utilities/SharedMutex.cpp deleted file mode 100644 index 1a97b8d472..0000000000 --- a/Utilities/SharedMutex.cpp +++ /dev/null @@ -1,187 +0,0 @@ -#include "SharedMutex.h" - -#include -#include - -struct shared_mutex::internal -{ - std::mutex mutex; - - std::size_t rq_size{}; // Reader queue size (threads waiting on m_rcv) - std::size_t wq_size{}; // Writer queue size (threads waiting on m_wcv and m_ocv) - - std::condition_variable rcv; // Reader queue - std::condition_variable wcv; // Writer queue - std::condition_variable ocv; // For current exclusive owner -}; - -void shared_mutex::lock_shared_hard() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - // Validate - if ((m_ctrl & SM_INVALID_BIT) != 0) throw std::runtime_error("shared_mutex::lock_shared(): Invalid bit"); - if ((m_ctrl & SM_READER_MASK) == 0) throw std::runtime_error("shared_mutex::lock_shared(): No readers"); - - // Notify non-zero reader queue size - m_ctrl |= SM_WAITERS_BIT, m_data->rq_size++; - - // Fix excess reader count - if ((--m_ctrl & SM_READER_MASK) == 0 && m_data->wq_size) - { - // Notify exclusive owner - m_data->ocv.notify_one(); - } - - // Obtain the reader lock - while (true) - { - const auto ctrl = m_ctrl.load(); - - // Check writers and reader limit - if (m_data->wq_size || (ctrl & ~SM_WAITERS_BIT) >= SM_READER_MAX) - { - m_data->rcv.wait(lock); - continue; - } - - if (m_ctrl.compare_and_swap_test(ctrl, ctrl + 1)) - { - break; - } - } - - if (!--m_data->rq_size && !m_data->wq_size) - { - m_ctrl &= ~SM_WAITERS_BIT; - } -} - -void shared_mutex::unlock_shared_notify() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - if ((m_ctrl & SM_READER_MASK) == 0 && m_data->wq_size) - { - // Notify exclusive owner - lock.unlock(); - m_data->ocv.notify_one(); - } - else if (m_data->rq_size) - { - // Notify other readers - lock.unlock(); - m_data->rcv.notify_one(); - } -} - -void shared_mutex::lock_hard() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - // Validate - if ((m_ctrl & SM_INVALID_BIT) != 0) throw std::runtime_error("shared_mutex::lock(): Invalid bit"); - - // Notify non-zero writer queue size - m_ctrl |= SM_WAITERS_BIT, m_data->wq_size++; - - // Obtain the writer lock - while (true) - { - const auto ctrl = m_ctrl.load(); - - if (ctrl & SM_WRITER_LOCK) - { - m_data->wcv.wait(lock); - continue; - } - - if (m_ctrl.compare_and_swap_test(ctrl, ctrl | SM_WRITER_LOCK)) - { - break; - } - } - - // Wait for remaining readers - while ((m_ctrl & SM_READER_MASK) != 0) - { - m_data->ocv.wait(lock); - } - - if (!--m_data->wq_size && !m_data->rq_size) - { - m_ctrl &= ~SM_WAITERS_BIT; - } -} - -void shared_mutex::unlock_notify() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - if (m_data->wq_size) - { - // Notify next exclusive owner - lock.unlock(); - m_data->wcv.notify_one(); - } - else if (m_data->rq_size) - { - // Notify all readers - lock.unlock(); - m_data->rcv.notify_all(); - } -} - -void shared_mutex::lock_upgrade_hard() -{ - unlock_shared(); - lock(); -} - -void shared_mutex::lock_degrade_hard() -{ - initialize_once(); - - std::unique_lock lock(m_data->mutex); - - m_ctrl -= SM_WRITER_LOCK - 1; - - if (m_data->rq_size) - { - // Notify all readers - lock.unlock(); - m_data->rcv.notify_all(); - } - else if (m_data->wq_size) - { - // Notify next exclusive owner - lock.unlock(); - m_data->wcv.notify_one(); - } -} - -void shared_mutex::initialize_once() -{ - if (UNLIKELY(!m_data)) - { - auto ptr = new shared_mutex::internal; - - if (!m_data.compare_and_swap_test(nullptr, ptr)) - { - delete ptr; - } - } -} - -shared_mutex::~shared_mutex() -{ - delete m_data; -} diff --git a/Utilities/SharedMutex.h b/Utilities/SharedMutex.h deleted file mode 100644 index fe01558bdf..0000000000 --- a/Utilities/SharedMutex.h +++ /dev/null @@ -1,179 +0,0 @@ -#pragma once - -#include "types.h" -#include "Atomic.h" - -//! An attempt to create effective implementation of "shared mutex", lock-free in optimistic case. -//! All locking and unlocking may be done by a single LOCK XADD or LOCK CMPXCHG instruction. -//! MSVC implementation of std::shared_timed_mutex seems suboptimal. -//! std::shared_mutex is not available until C++17. -class shared_mutex final -{ - enum : u32 - { - SM_WRITER_LOCK = 1u << 31, // Exclusive lock flag, must be MSB - SM_WAITERS_BIT = 1u << 30, // Flag set if m_wq_size or m_rq_size is non-zero - SM_INVALID_BIT = 1u << 29, // Unreachable reader count bit (may be set by incorrect unlock_shared() call) - - SM_READER_MASK = SM_WAITERS_BIT - 1, // Valid reader count bit mask - SM_READER_MAX = 1u << 24, // Max reader count - }; - - atomic_t m_ctrl{}; // Control variable: reader count | SM_* flags - - struct internal; - - atomic_t m_data{}; // Internal data - - void lock_shared_hard(); - void unlock_shared_notify(); - - void lock_hard(); - void unlock_notify(); - - void lock_upgrade_hard(); - void lock_degrade_hard(); - -public: - constexpr shared_mutex() = default; - - // Initialize internal data - void initialize_once(); - - ~shared_mutex(); - - bool try_lock_shared() - { - const u32 ctrl = m_ctrl.load(); - - return ctrl < SM_READER_MAX && m_ctrl.compare_and_swap_test(ctrl, ctrl + 1); - } - - void lock_shared() - { - // Optimization: unconditional increment, compensated later - if (UNLIKELY(m_ctrl++ >= SM_READER_MAX)) - { - lock_shared_hard(); - } - } - - void unlock_shared() - { - if (UNLIKELY(m_ctrl-- >= SM_READER_MAX)) - { - unlock_shared_notify(); - } - } - - bool try_lock() - { - return !m_ctrl && m_ctrl.compare_and_swap_test(0, SM_WRITER_LOCK); - } - - void lock() - { - if (UNLIKELY(!m_ctrl.compare_and_swap_test(0, SM_WRITER_LOCK))) - { - lock_hard(); - } - } - - void unlock() - { - m_ctrl &= ~SM_WRITER_LOCK; - - if (UNLIKELY(m_ctrl)) - { - unlock_notify(); - } - } - - bool try_lock_upgrade() - { - return m_ctrl == 1 && m_ctrl.compare_and_swap_test(1, SM_WRITER_LOCK); - } - - bool try_lock_degrade() - { - return m_ctrl == SM_WRITER_LOCK && m_ctrl.compare_and_swap_test(SM_WRITER_LOCK, 1); - } - - void lock_upgrade() - { - if (UNLIKELY(!m_ctrl.compare_and_swap_test(1, SM_WRITER_LOCK))) - { - lock_upgrade_hard(); - } - } - - void lock_degrade() - { - if (UNLIKELY(!m_ctrl.compare_and_swap_test(SM_WRITER_LOCK, 1))) - { - lock_degrade_hard(); - } - } -}; - -//! Simplified shared (reader) lock implementation. -//! std::shared_lock may be used instead if necessary. -class reader_lock final -{ - shared_mutex& m_mutex; - -public: - reader_lock(const reader_lock&) = delete; - - reader_lock(shared_mutex& mutex) - : m_mutex(mutex) - { - m_mutex.lock_shared(); - } - - ~reader_lock() - { - m_mutex.unlock_shared(); - } -}; - -//! Simplified exclusive (writer) lock implementation. -//! std::lock_guard may or std::unique_lock be used instead if necessary. -class writer_lock final -{ - shared_mutex& m_mutex; - -public: - writer_lock(const writer_lock&) = delete; - - writer_lock(shared_mutex& mutex) - : m_mutex(mutex) - { - m_mutex.lock(); - } - - ~writer_lock() - { - m_mutex.unlock(); - } -}; - -// Exclusive (writer) lock in the scope of shared (reader) lock. -class upgraded_lock final -{ - shared_mutex& m_mutex; - -public: - upgraded_lock(const writer_lock&) = delete; - - upgraded_lock(shared_mutex& mutex) - : m_mutex(mutex) - { - m_mutex.lock_upgrade(); - } - - ~upgraded_lock() - { - m_mutex.lock_degrade(); - } -}; diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 1722cc114f..34249ade2d 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -2043,7 +2043,7 @@ void thread_ctrl::push_atexit(task_stack task) thread_ctrl::thread_ctrl(std::string&& name) : m_name(std::move(name)) { - CHECK_STORAGE(std::thread, m_thread); + static_assert(sizeof(std::thread) <= sizeof(m_thread), "Small storage"); #pragma push_macro("new") #undef new diff --git a/Utilities/cond.cpp b/Utilities/cond.cpp new file mode 100644 index 0000000000..ec85991bfc --- /dev/null +++ b/Utilities/cond.cpp @@ -0,0 +1,107 @@ +#include "cond.h" +#include "sync.h" + +bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept +{ + verify(HERE), _old != -1; // Very unlikely: it requires 2^32 distinct threads to wait simultaneously + +#ifdef _WIN32 + LARGE_INTEGER timeout; + timeout.QuadPart = _timeout * -10; + + if (HRESULT rc = NtWaitForKeyedEvent(nullptr, &m_value, false, _timeout == -1 ? nullptr : &timeout)) + { + verify(HERE), rc == WAIT_TIMEOUT; + + // Retire + if (!m_value.fetch_op([](u32& value) { if (value) value--; })) + { + NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr); + return true; + } + + return false; + } + + return true; +#elif __linux__ + timespec timeout; + timeout.tv_sec = _timeout / 1000000; + timeout.tv_nsec = (_timeout % 1000000) * 1000; + + for (u32 value = _old + 1;; value = m_value) + { + const int err = futex((int*)&m_value.raw(), FUTEX_WAIT_PRIVATE, value, _timeout == -1 ? nullptr : &timeout, nullptr, 0) == 0 + ? 0 + : errno; + + // Normal or timeout wakeup + if (!err || (_timeout != -1 && err == ETIMEDOUT)) + { + // Cleanup (remove waiter) + verify(HERE), m_value--; + return !err; + } + + // Not a wakeup + verify(HERE), err == EAGAIN; + } +#else + // TODO + std::this_thread::sleep_for(std::chrono::microseconds(50)); + verify(HERE), m_value--; + return true; +#endif +} + +void cond_variable::imp_wake(u32 _count) noexcept +{ +#ifdef _WIN32 + // Try to subtract required amount of waiters + const u32 count = m_value.atomic_op([=](u32& value) + { + if (value > _count) + { + value -= _count; + return _count; + } + + return std::exchange(value, 0); + }); + + for (u32 i = count; i > 0; i--) + { + NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); + } +#elif __linux__ + for (u32 i = _count; i > 0; sched_yield()) + { + const u32 value = m_value; + + // Constrain remaining amount with imaginary waiter count + if (i > value) + { + i = value; + } + + if (!value || i == 0) + { + // Nothing to do + return; + } + + if (const int res = futex((int*)&m_value.raw(), FUTEX_WAKE_PRIVATE, i > INT_MAX ? INT_MAX : i, nullptr, nullptr, 0)) + { + verify(HERE), res >= 0 && res <= i; + i -= res; + } + + if (!m_value || i == 0) + { + // Escape + return; + } + } +#endif +} + diff --git a/Utilities/cond.h b/Utilities/cond.h new file mode 100644 index 0000000000..00c58f02e8 --- /dev/null +++ b/Utilities/cond.h @@ -0,0 +1,50 @@ +#pragma once + +#include "types.h" +#include "Atomic.h" + +// Lightweight condition variable +class cond_variable +{ + // Internal waiter counter + atomic_t m_value{0}; + +protected: + // Internal waiting function + bool imp_wait(u32 _old, u64 _timeout) noexcept; + + // Try to notify up to _count threads + void imp_wake(u32 _count) noexcept; + +public: + constexpr cond_variable() = default; + + // Intrusive wait algorithm for lockable objects + template + explicit_bool_t wait(T& object, u64 usec_timeout = -1) + { + const u32 _old = m_value.fetch_add(1); // Increment waiter counter + (object.*Unlock)(); + const bool res = imp_wait(_old, usec_timeout); + (object.*Lock)(); + return res; + } + + // Wake one thread + void notify_one() noexcept + { + if (m_value) + { + imp_wake(1); + } + } + + // Wake all threads + void notify_all() noexcept + { + if (m_value) + { + imp_wake(-1); + } + } +}; diff --git a/Utilities/mutex.cpp b/Utilities/mutex.cpp new file mode 100644 index 0000000000..aa7da909c8 --- /dev/null +++ b/Utilities/mutex.cpp @@ -0,0 +1,245 @@ +#include "mutex.h" +#include "sync.h" + +#ifdef _WIN32 +thread_local const u32 owned_mutex::g_tid = GetCurrentThreadId(); +#elif __linux__ +#include +thread_local const u32 owned_mutex::g_tid = syscall(SYS_gettid) + 1; +static_assert(sizeof(pid_t) == sizeof(u32), "Unexpected sizeof(pid_t)"); +#else + +#include + +thread_local const u32 owned_mutex::g_tid = []() -> u32 +{ + static std::mutex g_tid_mutex; + static std::vector g_tid_map(1); + + thread_local const struct tid_alloc + { + u32 id = 0; + + tid_alloc() + { + std::lock_guard lock(g_tid_mutex); + + // Allocate + while (++id < g_tid_map.size()) + { + if (!g_tid_map[id]) + { + g_tid_map[id] = true; + return; + } + } + + g_tid_map.push_back(true); + } + + ~tid_alloc() + { + std::lock_guard lock(g_tid_mutex); + + // Erase + g_tid_map[id] = false; + } + } g_tid; + + return g_tid.id; +}(); +#endif + +void shared_mutex::imp_lock_shared(s64 _old) +{ + verify("shared_mutex overflow" HERE), _old <= c_max; + + // 1) Wait as a writer, notify the next writer + // 2) Wait as a reader, until the value > 0 + lock(); + _old = m_value.fetch_add(c_one - c_min); + + if (_old) + { + imp_unlock(_old); + } + +#ifdef _WIN32 + if (_old + c_one - c_min < 0) + { + NtWaitForKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr); + } +#else + for (s64 value = m_value; value < 0; value = m_value) + { + if (futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_PRIVATE, value >> 32, nullptr, nullptr, 0) == -1) + { + verify(HERE), errno == EAGAIN; + } + } +#endif +} + +void shared_mutex::imp_unlock_shared(s64 _old) +{ + verify("shared_mutex overflow" HERE), _old + c_min <= c_max; + + // Check reader count, notify the writer if necessary (set c_sig) + if ((_old + c_min) % c_one == 0) // TODO + { + verify(HERE), !atomic_storage::bts(m_value.raw(), 0); +#ifdef _WIN32 + NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); +#else + verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0; +#endif + } +} + +void shared_mutex::imp_lock(s64 _old) +{ + verify("shared_mutex overflow" HERE), _old <= c_max; + +#ifdef _WIN32 + NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr); + verify(HERE), atomic_storage::btr(m_value.raw(), 0); +#else + for (s64 value = m_value; (m_value & c_sig) == 0 || !atomic_storage::btr(m_value.raw(), 0); value = m_value) + { + if (futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0) == -1) + { + verify(HERE), errno == EAGAIN; + } + } +#endif +} + +void shared_mutex::imp_unlock(s64 _old) +{ + verify("shared_mutex overflow" HERE), _old + c_one <= c_max; + + // 1) Notify the next writer if necessary (set c_sig) + // 2) Notify all readers otherwise if necessary + if (_old + c_one <= 0) + { + verify(HERE), !atomic_storage::bts(m_value.raw(), 0); +#ifdef _WIN32 + NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); +#else + verify(HERE), futex((int*)&m_value.raw() + IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0) >= 0; +#endif + } + else if (s64 count = -_old / c_min) + { +#ifdef _WIN32 + while (count--) + { + NtReleaseKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr); + } +#else + verify(HERE), futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0) >= 0; +#endif + } +} + +void shared_mutex::imp_lock_upgrade() +{ + unlock_shared(); + lock(); +} + +void shared_mutex::imp_lock_degrade() +{ + unlock(); + lock_shared(); +} + +bool shared_mutex::try_lock_shared() +{ + // Conditional decrement + return m_value.fetch_op([](s64& value) { if (value >= c_min) value -= c_min; }) >= c_min; +} + +bool shared_mutex::try_lock() +{ + // Conditional decrement (TODO: obtain c_sig) + return m_value.compare_and_swap_test(c_one, 0); +} + +bool shared_mutex::try_lock_upgrade() +{ + // TODO + return m_value.compare_and_swap_test(c_one - c_min, 0); +} + +bool shared_mutex::try_lock_degrade() +{ + // TODO + return m_value.compare_and_swap_test(0, c_one - c_min); +} + +bool owned_mutex::lock() noexcept +{ + if (m_value && m_owner == g_tid) + { + return false; + } + +#ifdef _WIN32 + if (m_value++) + { + NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr); + } + + m_owner.store(g_tid); +#else + u32 _last = ++m_value; + + if (_last == 1 && m_owner.compare_and_swap_test(0, g_tid)) + { + return true; + } + + while (!m_owner.compare_and_swap_test(0, g_tid)) + { + if (futex((int*)&m_value.raw(), FUTEX_WAIT_PRIVATE, _last, nullptr, nullptr, 0)) + { + _last = m_value.load(); + } + } +#endif + + return true; +} + +bool owned_mutex::try_lock() noexcept +{ + if (m_value || !m_value.compare_and_swap_test(0, 1)) + { + return false; + } + + m_owner.store(g_tid); + return true; +} + +bool owned_mutex::unlock() noexcept +{ + if (UNLIKELY(m_owner != g_tid)) + { + return false; + } + + m_owner.store(0); + + if (--m_value) + { +#ifdef _WIN32 + NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); +#else + futex((int*)&m_value.raw(), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); +#endif + } + + return true; +} diff --git a/Utilities/mutex.h b/Utilities/mutex.h new file mode 100644 index 0000000000..05b3c84ab5 --- /dev/null +++ b/Utilities/mutex.h @@ -0,0 +1,218 @@ +#pragma once + +#include "types.h" +#include "Atomic.h" + +// Shared mutex. +class shared_mutex final +{ + enum : s64 + { + c_one = 1ull << 31, // Fixed-point 1.0 value (one writer) + c_min = 0x00000002, // Fixed-point 1.0/max_readers value + c_sig = 0x00000001, + c_max = c_one + }; + + atomic_t m_value{c_one}; // Semaphore-alike counter + + void imp_lock_shared(s64 _old); + void imp_unlock_shared(s64 _old); + + void imp_lock(s64 _old); + void imp_unlock(s64 _old); + + void imp_lock_upgrade(); + void imp_lock_degrade(); + +public: + constexpr shared_mutex() = default; + + bool try_lock_shared(); + + void lock_shared() + { + const s64 value = m_value.load(); + + // Fast path: decrement if positive + if (UNLIKELY(value < c_min || value > c_one || !m_value.compare_and_swap_test(value, value - c_min))) + { + imp_lock_shared(value); + } + } + + void unlock_shared() + { + // Unconditional increment + const s64 value = m_value.fetch_add(c_min); + + if (value < 0 || value > c_one - c_min) + { + imp_unlock_shared(value); + } + } + + bool try_lock(); + + void lock() + { + // Unconditional decrement + const s64 value = m_value.fetch_sub(c_one); + + if (value != c_one) + { + imp_lock(value); + } + } + + void unlock() + { + // Unconditional increment + const s64 value = m_value.fetch_add(c_one); + + if (value != 0) + { + imp_unlock(value); + } + } + + bool try_lock_upgrade(); + + void lock_upgrade() + { + // TODO + if (!m_value.compare_and_swap_test(c_one - c_min, 0)) + { + imp_lock_upgrade(); + } + } + + bool try_lock_degrade(); + + void lock_degrade() + { + // TODO + if (!m_value.compare_and_swap_test(0, c_one - c_min)) + { + imp_lock_degrade(); + } + } +}; + +// Simplified shared (reader) lock implementation, std::shared_lock compatible. +class reader_lock final +{ + shared_mutex& m_mutex; + +public: + reader_lock(const reader_lock&) = delete; + + explicit reader_lock(shared_mutex& mutex) + : m_mutex(mutex) + { + m_mutex.lock_shared(); + } + + ~reader_lock() + { + m_mutex.unlock_shared(); + } +}; + +// Simplified exclusive (writer) lock implementation, std::lock_guard compatible. +class writer_lock final +{ + shared_mutex& m_mutex; + +public: + writer_lock(const writer_lock&) = delete; + + explicit writer_lock(shared_mutex& mutex) + : m_mutex(mutex) + { + m_mutex.lock(); + } + + ~writer_lock() + { + m_mutex.unlock(); + } +}; + +// Exclusive (writer) lock in the scope of shared (reader) lock (experimental). +class upgraded_lock final +{ + shared_mutex& m_mutex; + +public: + upgraded_lock(const writer_lock&) = delete; + + explicit upgraded_lock(shared_mutex& mutex) + : m_mutex(mutex) + { + m_mutex.lock_upgrade(); + } + + ~upgraded_lock() + { + m_mutex.lock_degrade(); + } +}; + +// Normal mutex with owner registration. +class owned_mutex +{ + atomic_t m_value{0}; + atomic_t m_owner{0}; + +protected: + // Thread id + static thread_local const u32 g_tid; + +public: + constexpr owned_mutex() = default; + + // Returns false if current thread already owns the mutex. + bool lock() noexcept; + + // Returns false if locked by any thread. + bool try_lock() noexcept; + + // Returns false if current thread doesn't own the mutex. + bool unlock() noexcept; + + // Check state. + bool is_locked() const { return m_value != 0; } + + // Check owner. + bool is_owned() const { return m_owner == g_tid; } +}; + +// Recursive lock for owned_mutex (experimental). +class recursive_lock final +{ + owned_mutex& m_mutex; + const bool m_first; + +public: + recursive_lock(const recursive_lock&) = delete; + + explicit recursive_lock(owned_mutex& mutex) + : m_mutex(mutex) + , m_first(mutex.lock()) + { + } + + // Check whether the lock "owns" the mutex + explicit operator bool() const + { + return m_first; + } + + ~recursive_lock() + { + if (m_first && !m_mutex.unlock()) + { + } + } +}; diff --git a/Utilities/sema.cpp b/Utilities/sema.cpp new file mode 100644 index 0000000000..6043539cf1 --- /dev/null +++ b/Utilities/sema.cpp @@ -0,0 +1,162 @@ +#include "sema.h" +#include "sync.h" + +void semaphore_base::imp_wait(bool lsb) +{ + // 1. Obtain LSB, reset it + // 2. Notify other posters if necessary + +#ifdef _WIN32 + if (!lsb) + { + while ((m_value & 1) == 0 || !atomic_storage::btr(m_value.raw(), 0)) + { + // Wait infinitely until signaled + verify(HERE), NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr) == ERROR_SUCCESS; + } + } + + // Notify instantly + LARGE_INTEGER timeout; + timeout.QuadPart = 0; + if (HRESULT rc = NtReleaseKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout)) + { + verify(HERE), rc == WAIT_TIMEOUT; + } +#elif __linux__ + if (!lsb) + { + for (s32 value = m_value; (m_value & 1) == 0 || !atomic_storage::btr(m_value.raw(), 0); value = m_value) + { + if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, -2) == -1) + { + verify(HERE), errno == EAGAIN; + } + } + } + + verify(HERE), futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, 1) >= 0; +#else + if (lsb) + { + return; + } + + while ((m_value & 1) == 0 || !atomic_storage::btr(m_value.raw(), 0)) + { + std::this_thread::sleep_for(std::chrono::microseconds(50)); + } +#endif +} + +void semaphore_base::imp_post(s32 _old) +{ + verify("semaphore_base: overflow" HERE), _old < 0; + + // 1. Set LSB, waiting until it can be set if necessary + // 2. Notify waiter + +#ifdef _WIN32 + while ((_old & 1) == 0 && atomic_storage::bts(m_value.raw(), 0)) + { + static_assert(ERROR_SUCCESS == 0, "Unexpected ERROR_SUCCESS value"); + + LARGE_INTEGER timeout; + timeout.QuadPart = -500; // ~50us + if (HRESULT rc = NtWaitForKeyedEvent(nullptr, (u8*)&m_value + 2, false, &timeout)) + { + verify(HERE), rc == WAIT_TIMEOUT; + } + } + + LARGE_INTEGER timeout; + timeout.QuadPart = 0; // Instant for the first attempt + while (HRESULT rc = NtReleaseKeyedEvent(nullptr, &m_value, false, &timeout)) + { + verify(HERE), rc == WAIT_TIMEOUT; + + if (!timeout.QuadPart) + { + timeout.QuadPart = -500; // ~50us + NtDelayExecution(false, &timeout); + } + + if ((m_value & 1) == 0) + { + break; + } + } +#elif __linux__ + for (s32 value = m_value; (_old & 1) == 0 && atomic_storage::bts(m_value.raw(), 0); value = m_value) + { + if (futex(&m_value.raw(), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, 1) == -1) + { + verify(HERE), errno == EAGAIN; + } + } + + if (1 + 0 == verify(HERE, 1 + futex(&m_value.raw(), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, -2))) + { + usleep(50); + } +#else + if (_old & 1) + { + return; + } + + while (m_value & 1 || atomic_storage::bts(m_value.raw(), 0)) + { + std::this_thread::sleep_for(std::chrono::microseconds(50)); + } +#endif +} + +bool semaphore_base::try_wait() +{ + // Conditional decrement + const s32 value = m_value.fetch_op([](s32& value) + { + if (value > 0 || value & 1) + { + if (value <= 1) + { + value &= ~1; + } + + value -= 1 << 1; + } + }); + + if (value & 1 && value <= 1) + { + imp_wait(true); + return true; + } + + return value > 0 || value & 1; +} + +bool semaphore_base::try_post(s32 _max) +{ + // Conditional increment + const s32 value = m_value.fetch_op([&](s32& value) + { + if (value < _max) + { + if (value < 0) + { + value |= 1; + } + + value += 1 << 1; + } + }); + + if (value < 0) + { + imp_post(value ^ 1); + } + + return value < _max; +} diff --git a/Utilities/sema.h b/Utilities/sema.h new file mode 100644 index 0000000000..341370f913 --- /dev/null +++ b/Utilities/sema.h @@ -0,0 +1,110 @@ +#pragma once + +#include "types.h" +#include "Atomic.h" + +// Lightweight semaphore helper class +class semaphore_base +{ + // Semaphore value (shifted; negative values imply 0 with waiters, LSB is used to ping-pong signals between threads) + atomic_t m_value; + + void imp_wait(bool lsb); + + void imp_post(s32 _old); + +protected: + explicit constexpr semaphore_base(s32 value) + : m_value{value} + { + } + + void wait() + { + // Unconditional decrement + if (UNLIKELY(m_value.sub_fetch(1 << 1) < 0)) + { + imp_wait(false); + } + } + + bool try_wait(); + + void post(s32 _max) + { + // Unconditional increment + const s32 value = m_value.fetch_add(1 << 1); + + if (UNLIKELY(value < 0 || value >= _max)) + { + imp_post(value & ~1); + } + } + + bool try_post(s32 _max); + +public: + // Get current semaphore value + s32 get() const + { + // Load value + const s32 value = m_value; + + // Return only positive value + return value < 0 ? 0 : value >> 1; + } +}; + +// Lightweight semaphore template (default arguments define binary semaphore and Def == Max) +template +class semaphore final : public semaphore_base +{ + static_assert(Max >= 0 && Max < (1 << 30), "semaphore<>: Max is out of bounds"); + static_assert(Def >= 0 && Def < (1 << 30), "semaphore<>: Def is out of bounds"); + static_assert(Def <= Max, "semaphore<>: Def is too big"); + + using base = semaphore_base; + +public: + // Default constructor (recommended) + constexpr semaphore() + : base{Def << 1} + { + } + + // Explicit value constructor (not recommended) + explicit constexpr semaphore(s32 value) + : base{value << 1} + { + } + + // Obtain a semaphore + void wait() + { + return base::wait(); + } + + // Try to obtain a semaphore + explicit_bool_t try_wait() + { + return base::try_wait(); + } + + // Return a semaphore + void post() + { + return base::post(Max << 1); + } + + // Try to return a semaphore + explicit_bool_t try_post() + { + return base::try_post(Max << 1); + } + + // Get max semaphore value + static constexpr s32 size() + { + return Max; + } +}; diff --git a/Utilities/types.h b/Utilities/types.h index b94fe7a964..823fc6a36d 100644 --- a/Utilities/types.h +++ b/Utilities/types.h @@ -30,16 +30,12 @@ #define SAFE_BUFFERS #define NEVER_INLINE __attribute__((noinline)) #define FORCE_INLINE __attribute__((always_inline)) inline - -// Some platforms don't support thread_local well yet. -#define thread_local __thread #endif #define CHECK_SIZE(type, size) static_assert(sizeof(type) == size, "Invalid " #type " type size") #define CHECK_ALIGN(type, align) static_assert(alignof(type) == align, "Invalid " #type " type alignment") #define CHECK_MAX_SIZE(type, size) static_assert(sizeof(type) <= size, #type " type size is too big") #define CHECK_SIZE_ALIGN(type, size, align) CHECK_SIZE(type, size); CHECK_ALIGN(type, align) -#define CHECK_STORAGE(type, storage) static_assert(sizeof(type) <= sizeof(storage) && alignof(type) <= alignof(decltype(storage)), #type " is too small") // Return 32 bit sizeof() to avoid widening/narrowing conversions with size_t #define SIZE_32(...) static_cast(sizeof(__VA_ARGS__)) @@ -533,7 +529,7 @@ struct verify_impl // Verification (can be safely disabled) if (!verify_func()(std::forward(value))) { - fmt::raw_verify_error(cause, fmt::get_type_info(), N); + fmt::raw_verify_error(cause, nullptr, N); } return verify_impl{cause}; diff --git a/rpcs3/Emu/Cell/SPUAnalyser.h b/rpcs3/Emu/Cell/SPUAnalyser.h index 25f23c593a..aa29413a63 100644 --- a/rpcs3/Emu/Cell/SPUAnalyser.h +++ b/rpcs3/Emu/Cell/SPUAnalyser.h @@ -1,6 +1,6 @@ #pragma once -#include "Utilities/SharedMutex.h" +#include "Utilities/mutex.h" #include diff --git a/rpcs3/Emu/IPC.h b/rpcs3/Emu/IPC.h index f8f73d0916..5ff926c6e6 100644 --- a/rpcs3/Emu/IPC.h +++ b/rpcs3/Emu/IPC.h @@ -3,7 +3,7 @@ #include #include -#include "Utilities/SharedMutex.h" +#include "Utilities/mutex.h" // IPC manager for objects of type T and IPC keys of type K. // External declaration of g_ipc is required. diff --git a/rpcs3/Emu/IdManager.h b/rpcs3/Emu/IdManager.h index c42e3209d9..7bb8961dad 100644 --- a/rpcs3/Emu/IdManager.h +++ b/rpcs3/Emu/IdManager.h @@ -1,7 +1,7 @@ #pragma once #include "Utilities/types.h" -#include "Utilities/SharedMutex.h" +#include "Utilities/mutex.h" #include #include diff --git a/rpcs3/Emu/Memory/wait_engine.cpp b/rpcs3/Emu/Memory/wait_engine.cpp index 02144a84e6..fa96ab1393 100644 --- a/rpcs3/Emu/Memory/wait_engine.cpp +++ b/rpcs3/Emu/Memory/wait_engine.cpp @@ -4,7 +4,7 @@ #include "wait_engine.h" #include "Utilities/Thread.h" -#include "Utilities/SharedMutex.h" +#include "Utilities/mutex.h" #include diff --git a/rpcs3/emucore.vcxproj b/rpcs3/emucore.vcxproj index 9061e5e5df..fc69fc1fe0 100644 --- a/rpcs3/emucore.vcxproj +++ b/rpcs3/emucore.vcxproj @@ -73,6 +73,9 @@ NotUsing + + NotUsing + NotUsing @@ -84,11 +87,11 @@ NotUsing - - + NotUsing - + + NotUsing @@ -259,11 +262,7 @@ - NotUsing - NotUsing - NotUsing - NotUsing - NotUsing + NotUsing NotUsing @@ -394,20 +393,21 @@ + + + - - @@ -689,4 +689,4 @@ - \ No newline at end of file + diff --git a/rpcs3/emucore.vcxproj.filters b/rpcs3/emucore.vcxproj.filters index ec36d84c95..6f46a9e43d 100644 --- a/rpcs3/emucore.vcxproj.filters +++ b/rpcs3/emucore.vcxproj.filters @@ -182,9 +182,6 @@ Emu\GPU\RSX\Common - - Utilities - Utilities @@ -650,9 +647,6 @@ Emu - - Utilities - Emu\Memory @@ -887,6 +881,15 @@ Emu\GPU\RSX + + Utilities + + + Utilities + + + Utilities + @@ -1093,9 +1096,6 @@ Emu\GPU\RSX\Common - - Utilities - Emu\Cell @@ -1108,9 +1108,6 @@ Emu\Cell - - Utilities - Utilities @@ -1708,5 +1705,14 @@ Emu\GPU\RSX\Common + + Utilities + + + Utilities + + + Utilities + \ No newline at end of file