From 1bb7c74c93083728d80b4bf79d8798536acac462 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Thu, 5 Nov 2020 19:18:48 +0300 Subject: [PATCH] atomic.cpp: various cleanups and fixes Add pointer comparison to notifiers (to prevent spurious wakeups). Fix a bug with a possible double notification in raw_notify(). Fix a bug with incorrect allocatin bit slots for cond_handle. Add a semaphore counter to track max allowed number of threads. Use #define for some constants to STRINGIZE them in errors. Add some error messages when certain limits are reached. Fix a bug with a wrong check simply throwing std::abort. Use "special" notify_all patch with batch processing for every arch. Fix Win7 bug who no one probably noticed. --- rpcs3/util/atomic.cpp | 214 ++++++++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 89 deletions(-) diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index b40d7fef11..411f4ea013 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -80,6 +80,8 @@ ptr_cmp(const void* data, u32 size, __m128i old128, __m128i mask128) { return true; } + + break; } default: { @@ -138,7 +140,7 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12 namespace { // Essentially a fat semaphore - struct cond_handle + struct alignas(64) cond_handle { #ifdef _WIN32 u64 tid = GetCurrentThreadId(); @@ -147,6 +149,8 @@ namespace #endif atomic_t sync{}; u32 size{}; + u64 tsc0{}; + const void* ptr{}; __m128i mask{}; __m128i oldv{}; @@ -155,22 +159,46 @@ namespace std::condition_variable cond; std::mutex mtx; #endif + + bool forced_wakeup() + { + const auto [_old, ok] = sync.fetch_op([](u32& val) + { + if (val == 1 || val == 2) + { + val = 3; + return true; + } + + return false; + }); + + // Prevent collision between normal wake-up and forced one + return ok && _old == 1; + } }; + +#ifndef USE_STD + static_assert(sizeof(cond_handle) == 64); +#endif } -// Arbitrary max allowed thread number (to fit in 15 bits) -static constexpr u32 s_max_conds = 512 * 64 - 1; +// Max allowed thread number is chosen to fit in 15 bits +static std::aligned_storage_t s_cond_list[INT16_MAX]{}; -static std::aligned_storage_t s_cond_list[s_max_conds]{}; +// Used to allow concurrent notifying +static atomic_t s_cond_refs[INT16_MAX]{}; -atomic_t s_cond_bits[s_max_conds / 64]; +// Allocation bits +static atomic_t s_cond_bits[::align(INT16_MAX, 64) / 64]{}; -atomic_t s_cond_sema{0}; +// Allocation semaphore +static atomic_t s_cond_sema{0}; static u32 cond_alloc() { // Determine whether there is a free slot or not - if (!s_cond_sema.try_inc(s_max_conds + 1)) + if (!s_cond_sema.try_inc(INT16_MAX + 1)) { return 0; } @@ -184,9 +212,9 @@ static u32 cond_alloc() const u32 start = __rdtsc(); #endif - for (u32 i = start * 8;; i++) + for (u32 i = start;; i++) { - const u32 group = i % (s_max_conds / 64); + const u32 group = i % ::size32(s_cond_bits); const auto [bits, ok] = s_cond_bits[group].fetch_op([](u64& bits) { @@ -212,14 +240,14 @@ static u32 cond_alloc() } } - // TODO: unreachable + // Unreachable std::abort(); return 0; } static cond_handle* cond_get(u32 cond_id) { - if (cond_id - 1 < s_max_conds) [[likely]] + if (cond_id - 1 < u32{INT16_MAX}) [[likely]] { return std::launder(reinterpret_cast(s_cond_list + (cond_id - 1))); } @@ -229,7 +257,7 @@ static cond_handle* cond_get(u32 cond_id) static void cond_free(u32 cond_id) { - if (cond_id - 1 >= s_max_conds) + if (cond_id - 1 >= u32{INT16_MAX}) { fprintf(stderr, "cond_free(): bad id %u" HERE "\n", cond_id); std::abort(); @@ -247,6 +275,8 @@ static void cond_free(u32 cond_id) namespace { +#define MAX_THREADS (56) + struct alignas(128) sync_var { constexpr sync_var() noexcept = default; @@ -254,17 +284,17 @@ namespace // Reference counter, owning pointer, collision bit and optionally selected slot atomic_t addr_ref{}; - // Allocated semaphore bits (max 56, to make total size 128) - atomic_t sema_bits{}; + // Semaphores (allocated in reverse order), 0 means empty, 0x8000 is notifier lock + atomic_t sema_data[MAX_THREADS]{}; - // Semaphores (one per thread), data is platform-specific but 0 means empty - atomic_t sema_data[56]{}; + // Allocated semaphore bits (to make total size 128) + atomic_t sema_bits{}; atomic_t* sema_alloc() { const auto [bits, ok] = sema_bits.fetch_op([](u64& bits) { - if (bits + 1 < (1ull << 56)) + if (bits + 1 < (1ull << MAX_THREADS)) { // Set lowest clear bit bits |= bits + 1; @@ -280,6 +310,8 @@ namespace return &sema_data[std::countr_one(bits)]; } + // TODO: support extension if reached + fmt::raw_error("Thread limit " STRINGIZE(MAX_THREADS) " for a single address reached in atomic wait."); return nullptr; } @@ -287,6 +319,7 @@ namespace { if (sema < sema_data || sema >= std::end(sema_data)) { + fprintf(stderr, "sema_free(): bad sema ptr %p" HERE "\n", sema); std::abort(); } @@ -307,6 +340,8 @@ namespace }; static_assert(sizeof(sync_var) == 128); + +#undef MAX_THREADS } // Main hashtable for atomic wait. @@ -324,16 +359,28 @@ namespace } // Number of search groups (defines max slot branch count as gcount * 64) -static constexpr u32 s_slot_gcount = (s_hashtable_power > 7 ? 4096 : 256) / 64; +#define MAX_SLOTS (4096) // Array of slot branch objects -alignas(128) static slot_info s_slot_list[s_slot_gcount * 64]{}; +alignas(128) static slot_info s_slot_list[MAX_SLOTS]{}; // Allocation bits -static atomic_t s_slot_bits[s_slot_gcount]{}; +static atomic_t s_slot_bits[MAX_SLOTS / 64]{}; + +// Allocation semaphore +static atomic_t s_slot_sema{0}; + +static_assert(MAX_SLOTS % 64 == 0); static u64 slot_alloc() { + // Determine whether there is a free slot or not + if (!s_slot_sema.try_inc(MAX_SLOTS + 1)) + { + fmt::raw_error("Hashtable extension slot limit " STRINGIZE(MAX_SLOTS) " reached in atomic wait."); + return 0; + } + // Diversify search start points to reduce contention and increase immediate success chance #ifdef _WIN32 const u32 start = GetCurrentProcessorNumber(); @@ -343,9 +390,9 @@ static u64 slot_alloc() const u32 start = __rdtsc(); #endif - for (u32 i = 0;; i++) + for (u32 i = start;; i++) { - const u32 group = (i + start * 8) % s_slot_gcount; + const u32 group = i % ::size32(s_slot_bits); const auto [bits, ok] = s_slot_bits[group].fetch_op([](u64& bits) { @@ -366,11 +413,13 @@ static u64 slot_alloc() } } - // TODO: unreachable + // Unreachable std::abort(); return 0; } +#undef MAX_SLOTS + static sync_var* slot_get(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) { if (!loc) @@ -407,6 +456,9 @@ static void slot_free(u64 id) // Reset allocation bit id = (id & s_slot_mask) / one_v; s_slot_bits[id / 64] &= ~(1ull << (id % 64)); + + // Reset semaphore + s_slot_sema--; } static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) @@ -415,10 +467,8 @@ static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) if ((value & s_pointer_mask) != (iptr & s_pointer_mask)) { - if ((value & s_waiter_mask) == 0 || (value & s_collision_bit) == 0) - { - std::abort(); - } + ASSERT(value & s_waiter_mask); + ASSERT(value & s_collision_bit); // Get the number of leading equal bits to determine subslot const u64 eq_bits = std::countl_zero((((iptr ^ value) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16); @@ -430,7 +480,7 @@ static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) // Actual cleanup in reverse order auto [_old, ok] = loc->addr_ref.fetch_op([&](u64& value) { - if (value & s_waiter_mask) + ASSERT(value & s_waiter_mask); { value -= one_v; @@ -443,15 +493,10 @@ static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0) return 1; } - - std::abort(); }); if (ok > 1 && _old & s_collision_bit) { - if (loc->sema_bits) - std::abort(); - // Deallocate slot on last waiter slot_free(_old); } @@ -562,7 +607,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti if (cond_id == 0) { - fmt::raw_error("Thread limit (32767) reached in atomic wait."); + fmt::raw_error("Thread limit " STRINGIZE(INT16_MAX) " reached in atomic wait."); } auto sema = slot->sema_alloc(); @@ -584,13 +629,15 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti // Save for notifiers const auto cond = cond_get(cond_id); - // Store some info for notifiers + // Store some info for notifiers (some may be unused) cond->size = size; cond->mask = mask; cond->oldv = old_value; + cond->ptr = data; + cond->tsc0 = __rdtsc(); cond->sync = 1; - sema->release(cond_id); + sema->store(static_cast(cond_id)); #ifdef USE_STD // Lock mutex @@ -604,7 +651,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti bool fallback = false; #endif - while (ptr_cmp(data, size, old_value, mask) && cond->sync != 3) + while (ptr_cmp(data, size, old_value, mask)) { #ifdef USE_FUTEX struct timespec ts; @@ -652,7 +699,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti if (fallback) [[unlikely]] { - if (!cond->sync.compare_and_swap_test(2, 1)) + if (cond->sync.load() == 3 || !cond->sync.compare_and_swap_test(2, 1)) { fallback = false; break; @@ -751,15 +798,6 @@ alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i ma return false; } - // Dirty optimization: prevent attempting to lock dead or uninitialized sync vars - u32 sync_var = 0; - std::memcpy(&sync_var, reinterpret_cast(s_cond_list) + (sizeof(cond_handle) * (id - 1) + offsetof(cond_handle, sync)), sizeof(sync_var)); - - if (!sync_var) - { - return false; - } - // Set notify lock id |= 0x8000; return true; @@ -774,43 +812,30 @@ alert_sema(atomic_t* sema, const void* data, u64 info, u32 size, __m128i ma ok = false; - if (cond && cond->sync && (!size ? (!info || cond->tid == info) : cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) + if (cond && cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))) { - if ((!size && cond->sync.exchange(3) == 1) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))) + if ((!size && cond->forced_wakeup()) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))) { - // Imminent notification - if (!size || !info) - { - s_tls_notify_cb(data, 0); - } + ok = true; #ifdef USE_FUTEX // Use "wake all" arg for robustness, only 1 thread is expected futex(&cond->sync, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); - ok = true; #elif defined(USE_STD) // Not super efficient: locking is required to avoid lost notifications cond->mtx.lock(); cond->mtx.unlock(); cond->cond.notify_all(); - ok = true; #elif defined(_WIN32) if (NtWaitForAlertByThreadId) { - if (NtAlertThreadByThreadId(cond->tid) == NTSTATUS_SUCCESS) - { - // Could be some dead thread otherwise - ok = true; - } + // Sets some sticky alert bit, at least I believe so + NtAlertThreadByThreadId(cond->tid); } else { // Can wait in rare cases, which is its annoying weakness - if (NtReleaseKeyedEvent(nullptr, sema, 1, nullptr) == NTSTATUS_SUCCESS) - { - // Can't fail - ok = true; - } + NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); } #endif } @@ -866,6 +891,8 @@ bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id) return false; } + s_tls_notify_cb(data, 0); + u64 progress = 0; for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1) @@ -906,6 +933,8 @@ atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m12 return; } + s_tls_notify_cb(data, 0); + u64 progress = 0; for (u64 bits = slot->sema_bits; bits; bits &= bits - 1) @@ -922,7 +951,7 @@ atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m12 s_tls_notify_cb(data, -1); } -void +SAFE_BUFFERS void #ifdef _WIN32 __vectorcall #endif @@ -937,18 +966,19 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 return; } + s_tls_notify_cb(data, 0); + u64 progress = 0; -#if defined(_WIN32) && !defined(USE_FUTEX) && !defined(USE_STD) +#if defined(_WIN32) && !defined(USE_FUTEX) // Special path for Windows 7 if (!NtAlertThreadByThreadId) +#elif defined(USE_STD) { // Make a copy to filter out waiters that fail some checks u64 copy = slot->sema_bits.load(); u64 lock = 0; - u32 lock_ids[56]{}; - - static LARGE_INTEGER instant{}; + u32 lock_ids[64]{}; for (u64 bits = copy; bits; bits &= bits - 1) { @@ -963,15 +993,6 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 return false; } - u32 sync_var = 0; - std::memcpy(&sync_var, reinterpret_cast(s_cond_list) + (sizeof(cond_handle) * (id - 1) + offsetof(cond_handle, sync)), sizeof(sync_var)); - - if (!sync_var) - { - return false; - } - - // Set notify lock id |= 0x8000; return true; }); @@ -984,15 +1005,10 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 const auto cond = cond_get(cond_id); - if (cond && cond->sync && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) + if (cond && cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)) { if (cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2)) { - if (bits == copy) - { - s_tls_notify_cb(data, 0); - } - continue; } } @@ -1011,11 +1027,26 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 const auto sema = &slot->sema_data[id]; +#ifdef USE_STD + const auto cond = cond_get(lock_ids[id]); + + // Optimistic non-blocking path + if (cond->mtx.try_lock()) + { + cond->mtx.unlock(); + cond->cond.notify_all(); + } + else + { + continue; + } +#elif defined(_WIN32) if (NtReleaseKeyedEvent(nullptr, sema, 1, &instant) != NTSTATUS_SUCCESS) { // Failed to notify immediately continue; } +#endif s_tls_notify_cb(data, ++progress); @@ -1027,7 +1058,14 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 // Proceed with remaining bits using "normal" blocking waiting for (u64 bits = copy; bits; bits &= bits - 1) { - NtReleaseKeyedEvent(nullptr, &slot->sema_data[std::countr_zero(bits)], 1, nullptr); +#ifdef USE_STD + const auto cond = cond_get(lock_ids[std::countr_zero(bits)]); + cond->mtx.lock(); + cond->mtx.unlock(); + cond->cond.notify_all(); +#elif defined(_WIN32) + NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); +#endif s_tls_notify_cb(data, ++progress); } @@ -1040,8 +1078,6 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12 if (sema->fetch_and(0x7fff) == 0x8000) { - const u32 id = std::countr_zero(bits); - cond_free(lock_ids[id]); slot->sema_bits &= ~(1ull << id);