From e0f60c5dcee50bc7f1f5ce14d2d79d373ef00e75 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Sun, 20 Oct 2019 02:41:19 +0300 Subject: [PATCH] atomic.hpp: rewrite collision handling Remove "fallback" code path. Remove USE_FUTEX code path temporarily. --- rpcs3/util/atomic.cpp | 641 +++++++++++++++++++----------------------- 1 file changed, 293 insertions(+), 348 deletions(-) diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index b276d0daae..be104bd073 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -19,42 +19,138 @@ #include #include #include +#include + +// Hashtable size factor (can be set to 0 to stress-test collisions) +static constexpr uint s_hashtable_power = 17; // Total number of entries, should be a power of 2. -static constexpr std::uintptr_t s_hashtable_size = 1u << 22; +static constexpr std::uintptr_t s_hashtable_size = 1u << s_hashtable_power; -// TODO: it's probably better to implement more effective futex emulation for OSX/BSD here. -static atomic_t s_hashtable[s_hashtable_size]{}; +// Pointer mask without bits used as hash, assuming signed 48-bit pointers. +static constexpr u64 s_pointer_mask = 0xffff'ffff'ffff & ~((s_hashtable_size - 1)); -// Pointer mask without bits used as hash, assuming signed 48-bit pointers -static constexpr u64 s_pointer_mask = 0xffff'ffff'ffff & ~((s_hashtable_size - 1) << 2); +// Max number of waiters is 32767. +static constexpr u64 s_waiter_mask = s_hashtable_power ? 0x7fff'0000'0000'0000 : 0x7f00'0000'0000'0000; -// Max number of waiters is 32767 -static constexpr u64 s_waiter_mask = 0x7fff'0000'0000'0000; - -// +// Bit indicates that more than one. static constexpr u64 s_collision_bit = 0x8000'0000'0000'0000; -#ifdef USE_FUTEX -static constexpr u64 s_sema_mask = 0; -#else -// Number of search groups (defines max semaphore count as gcount * 64) -static constexpr u32 s_sema_gcount = 64; +// Allocated slot with secondary table. +static constexpr u64 s_slot_mask = ~(s_waiter_mask | s_pointer_mask | s_collision_bit); -// Bits encoding allocated semaphore index (zero = not allocated yet) -static constexpr u64 s_sema_mask = (64 * s_sema_gcount - 1) << 2; +// Main hashtable for atomic wait, uses lowest pointer bits. +static atomic_t s_hashtable[s_hashtable_size]{}; + +// Helper to get least significant set bit from 64-bit masks +template +static constexpr u64 one_v = Mask & (0 - Mask); + +namespace +{ + struct slot_info + { + constexpr slot_info() noexcept = default; + + // Combined allocated semaphore id and number of waiters + atomic_t sema_var{}; + + // Sub slots + atomic_t branch[48 - s_hashtable_power]{}; + }; +} + +// Number of search groups (defines max slot branch count as gcount * 64) +static constexpr u32 s_slot_gcount = (s_hashtable_power ? 16384 : 256) / 64; + +// Array of slot branch objects +static slot_info s_slot_list[s_slot_gcount * 64]{}; + +// Allocation bits +static atomic_t s_slot_bits[s_slot_gcount]{}; + +static u64 slot_alloc() +{ + // Diversify search start points to reduce contention and increase immediate success chance +#ifdef _WIN32 + const u32 start = GetCurrentProcessorNumber(); +#elif __linux__ + const u32 start = sched_getcpu(); +#else + const u32 start = __rdtsc(); #endif -// Implementation detail (remaining bits out of 32 available for futex) -static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask | s_collision_bit | s_sema_mask); + for (u32 i = 0;; i++) + { + const u32 group = (i + start) % s_slot_gcount; -// Callback for wait() function, returns false if wait should return -static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*) + const auto [bits, ok] = s_slot_bits[group].fetch_op([](u64& bits) + { + if (~bits) + { + // Set lowest clear bit + bits |= bits + 1; + return true; + } + + return false; + }); + + if (ok) + { + // Find lowest clear bit + return group * 64 + utils::cnttz64(~bits, false); + } + } + + // TODO: handle it somehow + std::printf("slot overflow\n"); + std::abort(); + return 0; +} + +static slot_info* slot_get(std::uintptr_t iptr, atomic_t* loc, u64 lv = 0) { - return true; -}; + if (!loc) + { + return nullptr; + } -#ifndef USE_FUTEX + const u64 value = loc->load(); + + if (!value) + { + return nullptr; + } + + if ((value & s_pointer_mask) == (iptr & s_pointer_mask)) + { + return &s_slot_list[(value & s_slot_mask) / one_v]; + } + + if ((value & s_collision_bit) == 0) + { + return nullptr; + } + + // Get the number of leading equal bits to determine subslot + const u64 eq_bits = utils::cntlz64((((iptr ^ value) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16, true); + + // Proceed recursively, increment level + return slot_get(iptr, s_slot_list[(value & s_slot_mask) / one_v].branch + eq_bits, eq_bits + 1); +} + +static void slot_free(u64 id) +{ + // Reset allocation bit + id = (id & s_slot_mask) / one_v; + s_slot_bits[id / 64] &= ~(1ull << (id % 64)); +} + +// Number of search groups (defines max semaphore count as gcount * 64) +static constexpr u32 s_sema_gcount = 128; + +static constexpr u64 s_sema_mask = (s_sema_gcount * 64 - 1); #ifdef USE_POSIX using sema_handle = sem_t; @@ -183,8 +279,6 @@ static bool sema_get(u32 id) return false; } -#endif - static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u64 mask) { switch (size) @@ -198,112 +292,11 @@ static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u6 return false; } -// Fallback implementation -namespace +// Callback for wait() function, returns false if wait should return +static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*) { - struct waiter - { - std::condition_variable cond; - void* const tls_ptr; - - explicit waiter(void* tls_ptr) - : tls_ptr(tls_ptr) - { - } - }; - - struct waiter_map - { - std::mutex mutex; - std::multimap list; - }; - - // Thread's unique node to insert without allocation - thread_local std::multimap::node_type s_tls_waiter = []() - { - // Initialize node from a dummy container (there is no separate node constructor) - std::multimap dummy; - return dummy.extract(dummy.emplace(nullptr, &s_tls_waiter)); - }(); - - waiter_map& get_fallback_map(const void* ptr) - { - static waiter_map s_waiter_maps[4096]; - - return s_waiter_maps[std::hash()(ptr) % std::size(s_waiter_maps)]; - } - - void fallback_wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask) - { - auto& wmap = get_fallback_map(data); - - if (!timeout) - { - return; - } - - // Update node key - s_tls_waiter.key() = data; - - if (std::unique_lock lock(wmap.mutex); ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data)) - { - // Add node to the waiter list - const auto iter = wmap.list.insert(std::move(s_tls_waiter)); - - // Wait until the node is returned to its TLS location - if (timeout + 1) - { - if (!iter->second.cond.wait_for(lock, std::chrono::nanoseconds(timeout), [&] - { - return 1 && s_tls_waiter; - })) - { - // Put it back - s_tls_waiter = wmap.list.extract(iter); - } - - return; - } - - while (!s_tls_waiter) - { - iter->second.cond.wait(lock); - } - } - } - - void fallback_notify(waiter_map& wmap, std::multimap::iterator found) - { - // Return notified node to its TLS location - const auto ptls = static_cast::node_type*>(found->second.tls_ptr); - *ptls = wmap.list.extract(found); - ptls->mapped().cond.notify_one(); - } - - void fallback_notify_one(const void* data) - { - auto& wmap = get_fallback_map(data); - - std::lock_guard lock(wmap.mutex); - - if (auto found = wmap.list.find(data); found != wmap.list.end()) - { - fallback_notify(wmap, found); - } - } - - void fallback_notify_all(const void* data) - { - auto& wmap = get_fallback_map(data); - - std::lock_guard lock(wmap.mutex); - - for (auto it = wmap.list.lower_bound(data); it != wmap.list.end() && it->first == data;) - { - fallback_notify(wmap, it++); - } - } -} + return true; +}; void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask) { @@ -314,52 +307,102 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu const std::uintptr_t iptr = reinterpret_cast(data); - atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; + // Allocated slot index + u64 slot_a = -1; - u32 new_value = 0; + // Found slot object + slot_info* slot = nullptr; - bool fallback = false; - - u32 sema_id = -1; - - const auto [_, ok] = entry.fetch_op([&](u64& value) + auto install_op = [&](u64& value) -> u64 { - if ((value & s_waiter_mask) == s_waiter_mask || (value & s_signal_mask) == s_signal_mask) + if ((value & s_waiter_mask) == s_waiter_mask) { - // Return immediately on waiter overflow or signal overflow - return false; + // Return immediately on waiter overflow + return 0; } -#ifndef USE_FUTEX - sema_id = (value & s_sema_mask) >> 2; -#endif - if (!value || (value & s_pointer_mask) == (iptr & s_pointer_mask)) { + if (!value) + { + if (slot_a + 1 == 0) + { + // First waiter: allocate slot and install it + slot_a = slot_alloc() * one_v; + } + + value |= slot_a; + } + // Store pointer bits value |= (iptr & s_pointer_mask); - fallback = false; } else { // Set collision bit value |= s_collision_bit; - fallback = true; } - // Add waiter - value += s_waiter_mask & -s_waiter_mask; - new_value = static_cast(value); - return true; - }); + // Return slot ptr + slot = &s_slot_list[(value & s_slot_mask) / one_v]; - if (!ok) + // Add waiter + value += one_v; + return value; + }; + + // Search detail + u64 lv = 0; + + // For cleanup + std::basic_string*> install_list; + + for (atomic_t* ptr = &s_hashtable[iptr % s_hashtable_size];;) { - return; + auto [_old, ok] = ptr->fetch_op(install_op); + + if (slot_a + 1) + { + if ((ok & s_slot_mask) == slot_a) + { + // Slot set successfully + slot_a = -1; + } + } + + if (!ok) + { + // Expected only on top level + return; + } + + if (!_old || (_old & s_pointer_mask) == (iptr & s_pointer_mask)) + { + // Success + if (slot_a + 1) + { + // Cleanup slot if unused + slot_free(slot_a); + slot_a = -1; + } + + break; + } + + // Get the number of leading equal bits (between iptr and slot owner) + const u64 eq_bits = utils::cntlz64((((iptr ^ ok) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16, true); + + // Collision; need to go deeper + ptr = slot->branch + eq_bits; + install_list.push_back(ptr); + + lv = eq_bits + 1; } -#ifndef USE_FUTEX - for (u32 loop_count = 0; !fallback && loop_count < 7; loop_count++) + // Now try to reference a semaphore (allocate it if needed) + u32 sema_id = static_cast(slot->sema_var & s_sema_mask); + + for (u32 loop_count = 0; loop_count < 7; loop_count++) { // Try to allocate a semaphore if (!sema_id) @@ -371,16 +414,16 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu break; } - sema_id = entry.atomic_op([&](u64& value) -> u32 + sema_id = slot->sema_var.atomic_op([&](u64& value) -> u32 { if (value & s_sema_mask) { - return (value & s_sema_mask) >> 2; + return static_cast(value & s_sema_mask); } // Insert allocated semaphore - value += s_signal_mask & -s_signal_mask; - value |= (u64{sema} << 2); + value += s_sema_mask + 1; + value |= sema; return 0; }); @@ -403,47 +446,45 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu } // Try to increment sig (check semaphore validity) - const auto [_old, ok] = entry.fetch_op([&](u64& value) + const auto [_old, _new] = slot->sema_var.fetch_op([&](u64& value) -> u64 { - if ((value & s_signal_mask) == s_signal_mask) + if ((value & ~s_sema_mask) == ~s_sema_mask) { - return false; + // Signal overflow + return 0; } - if ((value & s_sema_mask) >> 2 != sema_id) + if ((value & s_sema_mask) != sema_id) { - return false; + return 0; } - value += s_signal_mask & -s_signal_mask; - return true; + value += s_sema_mask + 1; + return value; }); - if (!ok) + if (!_new) { sema_free(sema_id); - sema_id = 0; - if ((_old & s_signal_mask) == s_signal_mask) + if ((_old & ~s_sema_mask) == ~s_sema_mask) { // Break on signal overflow + sema_id = -1; break; } + sema_id = _new & s_sema_mask; continue; } break; } -#endif - if (fallback) + bool fallback = false; + + if (sema_id && ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data)) { - fallback_wait(data, size, old_value, timeout, mask); - } - else if (sema_id && ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data)) - { -#ifndef USE_FUTEX #if defined(_WIN32) && !defined(USE_POSIX) LARGE_INTEGER qw; qw.QuadPart = -static_cast(timeout / 100); @@ -506,13 +547,6 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu sema.count--; fallback = true; } -#endif -#else - struct timespec ts; - ts.tv_sec = timeout / 1'000'000'000; - ts.tv_nsec = timeout % 1'000'000'000; - - futex(reinterpret_cast(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, new_value, timeout + 1 ? &ts : nullptr); #endif } @@ -524,35 +558,27 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu while (true) { // Try to decrement - const auto [prev, ok] = entry.fetch_op([&](u64& value) + const auto [prev, ok] = slot->sema_var.fetch_op([&](u64& value) { - if (value & s_waiter_mask) + if (value) { -#ifndef USE_FUTEX // If timeout if (!fallback) { - if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) + if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) { + // Give up if signaled or semaphore has already changed return false; } - value -= s_signal_mask & -s_signal_mask; + value -= s_sema_mask + 1; - if ((value & s_signal_mask) == 0) + if ((value & ~s_sema_mask) == 0) { - value &= ~s_sema_mask; + // Remove allocated sema on last waiter + value = 0; } } -#endif - - value -= s_waiter_mask & -s_waiter_mask; - - if ((value & s_waiter_mask) == 0) - { - // Reset on last waiter - value = 0; - } return true; } @@ -565,7 +591,6 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu break; } -#ifndef USE_FUTEX #if defined(_WIN32) && !defined(USE_POSIX) static LARGE_INTEGER instant{}; @@ -588,104 +613,57 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu sema.count--; fallback = true; } -#endif #endif } -#ifndef USE_FUTEX if (sema_id) { sema_free(sema_id); } -#endif + + for (auto ptr = (install_list.empty() ? &s_hashtable[iptr % s_hashtable_size] : install_list.back());;) + { + auto [_old, ok] = ptr->fetch_op([&](u64& value) + { + if (value & s_waiter_mask) + { + value -= one_v; + + if (!(value & s_waiter_mask)) + { + // Deallocate slot on last waiter + value = 0; + return 2; + } + + return 1; + } + + return 0; + }); + + if (!ok) + { + std::abort(); + } + + if (ok > 1) + { + slot_free(_old); + } + + if (ptr == &s_hashtable[iptr % s_hashtable_size]) + { + break; + } + + install_list.pop_back(); + ptr = install_list.empty() ? &s_hashtable[iptr % s_hashtable_size] : install_list.back(); + } s_tls_wait_cb(nullptr); } -#ifdef USE_FUTEX - -void atomic_storage_futex::notify_one(const void* data) -{ - const std::uintptr_t iptr = reinterpret_cast(data); - - atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; - - const auto [prev, ok] = entry.fetch_op([&](u64& value) - { - if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - if ((value & s_signal_mask) == s_signal_mask) - { - // Signal overflow, do nothing - return false; - } - - value += s_signal_mask & -s_signal_mask; - - if ((value & s_signal_mask) == s_signal_mask) - { - // Signal will overflow, fallback to notify_all - notify_all(data); - return false; - } - - return true; - } - else if (value & s_waiter_mask && value & s_collision_bit) - { - fallback_notify_one(data); - return false; - } - - return false; - }); - - if (ok) - { - futex(reinterpret_cast(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1); - } -} - -void atomic_storage_futex::notify_all(const void* data) -{ - const std::uintptr_t iptr = reinterpret_cast(data); - - atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; - - const auto [_, ok] = entry.fetch_op([&](u64& value) - { - if (value & s_waiter_mask) - { - if ((value & s_signal_mask) == s_signal_mask) - { - // Signal overflow, do nothing - return false; - } - - if ((value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - value += s_signal_mask & -s_signal_mask; - return true; - } - - if (value & s_collision_bit) - { - fallback_notify_all(data); - return false; - } - } - - return false; - }); - - if (ok) - { - futex(reinterpret_cast(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 0x7fffffff); - } -} - -#endif - void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data)) { if (cb) @@ -702,59 +680,44 @@ void atomic_storage_futex::raw_notify(const void* data) } } -#ifndef USE_FUTEX - void atomic_storage_futex::notify_one(const void* data) { const std::uintptr_t iptr = reinterpret_cast(data); - atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; + const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); - const u64 value = entry; - - if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0) - { - // No relevant waiters, do nothing - return; - } - } - else if (value & s_waiter_mask && value & s_collision_bit) - { - fallback_notify_one(data); - return; - } - else + if (!slot) { return; } - const u32 sema_id = (value & s_sema_mask) >> 2; + const u64 value = slot->sema_var; + + if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask)) + { + return; + } + + const u32 sema_id = static_cast(value & s_sema_mask); if (!sema_get(sema_id)) { return; } - const auto [_, ok] = entry.fetch_op([&](u64& value) + const auto [_, ok] = slot->sema_var.fetch_op([&](u64& value) { - if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask)) + if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) { return false; } - if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) - { - return false; - } - - value -= s_signal_mask & -s_signal_mask; + value -= s_sema_mask + 1; // Reset allocated semaphore on last waiter - if ((value & s_signal_mask) == 0) + if ((value & ~s_sema_mask) == 0) { - value &= ~s_sema_mask; + value = 0; } return true; @@ -783,51 +746,35 @@ void atomic_storage_futex::notify_all(const void* data) { const std::uintptr_t iptr = reinterpret_cast(data); - atomic_t& entry = s_hashtable[(iptr >> 2) % s_hashtable_size]; + const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]); - const u64 value = entry; - - if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask)) - { - if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0) - { - // No relevant waiters, do nothing - return; - } - } - else if (value & s_waiter_mask && value & s_collision_bit) - { - fallback_notify_all(data); - return; - } - else + if (!slot) { return; } - const u32 sema_id = (value & s_sema_mask) >> 2; + const u64 value = slot->sema_var; + + if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask)) + { + return; + } + + const u32 sema_id = static_cast(value & s_sema_mask); if (!sema_get(sema_id)) { return; } - const auto [_, count] = entry.fetch_op([&](u64& value) -> u32 + const auto [_, count] = slot->sema_var.fetch_op([&](u64& value) -> u32 { - if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask)) + if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id) { return 0; } - if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id) - { - return 0; - } - - const u32 r = (value & s_signal_mask) / (s_signal_mask & -s_signal_mask); - value &= ~s_sema_mask; - value &= ~s_signal_mask; - return r; + return (std::exchange(value, 0) & ~s_sema_mask) / (s_sema_mask + 1); }); #ifdef USE_POSIX @@ -854,5 +801,3 @@ void atomic_storage_futex::notify_all(const void* data) sema_free(sema_id); } - -#endif