From 9fb8d449fec6d4bc7cb53b5a3a544aba2f662ccd Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Fri, 6 Nov 2020 00:06:58 +0300 Subject: [PATCH] atomic.cpp: more improvements Add ref counters for internal semaphores for concurrent notifying. Lack of them could result in loss of notification. --- rpcs3/util/atomic.cpp | 226 +++++++++++++++++++++++++----------------- 1 file changed, 134 insertions(+), 92 deletions(-) diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index 411f4ea013..00476ce268 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -183,14 +183,14 @@ namespace #endif } -// Max allowed thread number is chosen to fit in 15 bits -static std::aligned_storage_t s_cond_list[INT16_MAX]{}; +// Max allowed thread number is chosen to fit in 16 bits +static std::aligned_storage_t s_cond_list[UINT16_MAX]{}; // Used to allow concurrent notifying -static atomic_t s_cond_refs[INT16_MAX]{}; +static atomic_t s_cond_refs[UINT16_MAX + 1]{}; // Allocation bits -static atomic_t s_cond_bits[::align(INT16_MAX, 64) / 64]{}; +static atomic_t s_cond_bits[::align(UINT16_MAX, 64) / 64]{}; // Allocation semaphore static atomic_t s_cond_sema{0}; @@ -198,7 +198,7 @@ static atomic_t s_cond_sema{0}; static u32 cond_alloc() { // Determine whether there is a free slot or not - if (!s_cond_sema.try_inc(INT16_MAX + 1)) + if (!s_cond_sema.try_inc(UINT16_MAX + 1)) { return 0; } @@ -236,6 +236,9 @@ static u32 cond_alloc() // Construct inplace before it can be used new (s_cond_list + id) cond_handle(); + // Add first reference + verify(HERE), !s_cond_refs[id]++; + return id + 1; } } @@ -247,7 +250,7 @@ static u32 cond_alloc() static cond_handle* cond_get(u32 cond_id) { - if (cond_id - 1 < u32{INT16_MAX}) [[likely]] + if (cond_id - 1 < u32{UINT16_MAX}) [[likely]] { return std::launder(reinterpret_cast(s_cond_list + (cond_id - 1))); } @@ -257,12 +260,18 @@ static cond_handle* cond_get(u32 cond_id) static void cond_free(u32 cond_id) { - if (cond_id - 1 >= u32{INT16_MAX}) + if (cond_id - 1 >= u32{UINT16_MAX}) { fprintf(stderr, "cond_free(): bad id %u" HERE "\n", cond_id); std::abort(); } + // Dereference, destroy on last ref + if (--s_cond_refs[cond_id - 1]) + { + return; + } + // Call the destructor cond_get(cond_id)->~cond_handle(); @@ -273,6 +282,46 @@ static void cond_free(u32 cond_id) s_cond_sema--; } +static u32 cond_lock(atomic_t* sema) +{ + while (const u32 cond_id = sema->load()) + { + const auto [old, ok] = s_cond_refs[cond_id - 1].fetch_op([](u16& ref) + { + if (!ref || ref == UINT16_MAX) + { + // Don't reference already deallocated semaphore + return false; + } + + ref++; + return true; + }); + + if (ok) + { + return cond_id; + } + + if (old == UINT16_MAX) + { + fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " for a single address reached in atomic notifier."); + } + + if (sema->load() != cond_id) + { + // Try again if it changed + continue; + } + else + { + break; + } + } + + return 0; +} + namespace { #define MAX_THREADS (56) @@ -284,12 +333,14 @@ namespace // Reference counter, owning pointer, collision bit and optionally selected slot atomic_t addr_ref{}; - // Semaphores (allocated in reverse order), 0 means empty, 0x8000 is notifier lock + private: + // Semaphores (allocated in reverse order), empty are zeros atomic_t sema_data[MAX_THREADS]{}; // Allocated semaphore bits (to make total size 128) atomic_t sema_bits{}; + public: atomic_t* sema_alloc() { const auto [bits, ok] = sema_bits.fetch_op([](u64& bits) @@ -307,7 +358,7 @@ namespace if (ok) [[likely]] { // Find lowest clear bit - return &sema_data[std::countr_one(bits)]; + return get_sema(std::countr_one(bits)); } // TODO: support extension if reached @@ -315,6 +366,25 @@ namespace return nullptr; } + atomic_t* get_sema(u32 id) + { + verify(HERE), id < MAX_THREADS; + + return &sema_data[(MAX_THREADS - 1) - id]; + } + + u64 get_sema_bits() const + { + return sema_bits & ((1ull << MAX_THREADS) - 1); + } + + void reset_sema_bit(atomic_t* sema) + { + verify(HERE), sema >= sema_data && sema < std::end(sema_data); + + sema_bits &= ~(1ull << ((MAX_THREADS - 1) - (sema - sema_data))); + } + void sema_free(atomic_t* sema) { if (sema < sema_data || sema >= std::end(sema_data)) @@ -323,19 +393,11 @@ namespace std::abort(); } - const u32 cond_id = sema->fetch_and(0x8000); - - if (!cond_id || cond_id >> 15) - { - // Delegated cleanup - return; - } - - // Free - cond_free(cond_id); + // Try to deallocate semaphore (may be delegated to a notifier) + cond_free(sema->exchange(0)); // Clear sema bit - sema_bits &= ~(1ull << (sema - sema_data)); + reset_sema_bit(sema); } }; @@ -607,7 +669,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti if (cond_id == 0) { - fmt::raw_error("Thread limit " STRINGIZE(INT16_MAX) " reached in atomic wait."); + fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in atomic wait."); } auto sema = slot->sema_alloc(); @@ -790,29 +852,20 @@ __vectorcall #endif alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i mask, __m128i new_value) { - auto [cond_id, ok] = sema->fetch_op([](u16& id) - { - // Check if not zero and not locked - if (!id || id & 0x8000) - { - return false; - } + const u32 cond_id = cond_lock(sema); - // Set notify lock - id |= 0x8000; - return true; - }); - - if (!ok) [[unlikely]] + if (!cond_id) { return false; } const auto cond = cond_get(cond_id); - ok = false; + verify(HERE), cond; - if (cond && cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) + bool ok = false; + + if (cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) { if ((!size && cond->forced_wakeup()) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))) { @@ -841,18 +894,8 @@ alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i ma } } - // Remove lock, check if cond_id is already removed (leaving only 0x8000) - if (sema->fetch_and(0x7fff) == 0x8000) - { - cond_free(cond_id); - - // Cleanup, a little hacky obtainment of the host variable - const auto slot = std::launder(reinterpret_cast(reinterpret_cast(sema) & -128)); - - // Remove slot bit - slot->sema_bits &= ~(1ull << (sema - slot->sema_data)); - } - + // Remove lock, possibly deallocate cond + cond_free(cond_id); return ok; } @@ -895,9 +938,9 @@ bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id) u64 progress = 0; - for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1) + for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) { - const auto sema = &slot->sema_data[std::countr_zero(bits)]; + const auto sema = slot->get_sema(std::countr_zero(bits)); // Forced notification if (alert_sema(sema, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128())) @@ -937,9 +980,9 @@ atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m12 u64 progress = 0; - for (u64 bits = slot->sema_bits; bits; bits &= bits - 1) + for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) { - const auto sema = &slot->sema_data[std::countr_zero(bits)]; + const auto sema = slot->get_sema(std::countr_zero(bits)); if (alert_sema(sema, data, progress, size, mask, new_value)) { @@ -969,14 +1012,9 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 s_tls_notify_cb(data, 0); u64 progress = 0; - -#if defined(_WIN32) && !defined(USE_FUTEX) - // Special path for Windows 7 - if (!NtAlertThreadByThreadId) -#elif defined(USE_STD) { // Make a copy to filter out waiters that fail some checks - u64 copy = slot->sema_bits.load(); + u64 copy = slot->get_sema_bits(); u64 lock = 0; u32 lock_ids[64]{}; @@ -984,20 +1022,9 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 { const u32 id = std::countr_zero(bits); - const auto sema = &slot->sema_data[id]; + const auto sema = slot->get_sema(id); - auto [cond_id, ok] = sema->fetch_op([](u16& id) - { - if (!id || id & 0x8000) - { - return false; - } - - id |= 0x8000; - return true; - }); - - if (ok) + if (const u32 cond_id = cond_lock(sema)) { // Add lock bit for cleanup lock |= 1ull << id; @@ -1005,10 +1032,13 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 const auto cond = cond_get(cond_id); - if (cond && cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) + verify(HERE), cond; + + if (cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) { if (cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2)) { + // Ok. continue; } } @@ -1025,11 +1055,12 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 { const u32 id = std::countr_zero(bits); - const auto sema = &slot->sema_data[id]; - -#ifdef USE_STD + const auto sema = slot->get_sema(id); const auto cond = cond_get(lock_ids[id]); +#if defined(USE_FUTEX) + continue; +#elif defined(USE_STD) // Optimistic non-blocking path if (cond->mtx.try_lock()) { @@ -1041,13 +1072,19 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 continue; } #elif defined(_WIN32) + if (NtAlertThreadByThreadId) + { + continue; + } + + static LARGE_INTEGER instant{}; + if (NtReleaseKeyedEvent(nullptr, sema, 1, &instant) != NTSTATUS_SUCCESS) { // Failed to notify immediately continue; } #endif - s_tls_notify_cb(data, ++progress); // Remove the bit from next stage @@ -1058,13 +1095,27 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 // Proceed with remaining bits using "normal" blocking waiting for (u64 bits = copy; bits; bits &= bits - 1) { -#ifdef USE_STD - const auto cond = cond_get(lock_ids[std::countr_zero(bits)]); + const u32 id = std::countr_zero(bits); + + const auto sema = slot->get_sema(id); + const auto cond = cond_get(lock_ids[id]); + +#if defined(USE_FUTEX) + // Always alerted (result isn't meaningful here) + futex(&cond->sync, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); +#elif defined(USE_STD) cond->mtx.lock(); cond->mtx.unlock(); cond->cond.notify_all(); #elif defined(_WIN32) - NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); + if (NtAlertThreadByThreadId) + { + NtAlertThreadByThreadId(cond->tid); + } + else + { + NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); + } #endif s_tls_notify_cb(data, ++progress); } @@ -1072,26 +1123,17 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 // Cleanup locked notifiers for (u64 bits = lock; bits; bits &= bits - 1) { - const u32 id = std::countr_zero(bits); - - const auto sema = &slot->sema_data[id]; - - if (sema->fetch_and(0x7fff) == 0x8000) - { - cond_free(lock_ids[id]); - - slot->sema_bits &= ~(1ull << id); - } + cond_free(lock_ids[std::countr_zero(bits)]); } s_tls_notify_cb(data, -1); return; } -#endif - for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1) + // Unused, let's keep for reference + for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1) { - const auto sema = &slot->sema_data[std::countr_zero(bits)]; + const auto sema = slot->get_sema(std::countr_zero(bits)); if (alert_sema(sema, data, progress, size, mask, new_value)) {