mirror of
https://github.com/RPCS3/rpcs3.git
synced 2024-11-17 08:11:51 +00:00
atomic.hpp: rewrite collision handling
Remove "fallback" code path. Remove USE_FUTEX code path temporarily.
This commit is contained in:
parent
79a3a7ce4c
commit
e0f60c5dce
@ -19,42 +19,138 @@
|
||||
#include <condition_variable>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
// Hashtable size factor (can be set to 0 to stress-test collisions)
|
||||
static constexpr uint s_hashtable_power = 17;
|
||||
|
||||
// Total number of entries, should be a power of 2.
|
||||
static constexpr std::uintptr_t s_hashtable_size = 1u << 22;
|
||||
static constexpr std::uintptr_t s_hashtable_size = 1u << s_hashtable_power;
|
||||
|
||||
// TODO: it's probably better to implement more effective futex emulation for OSX/BSD here.
|
||||
static atomic_t<u64> s_hashtable[s_hashtable_size]{};
|
||||
// Pointer mask without bits used as hash, assuming signed 48-bit pointers.
|
||||
static constexpr u64 s_pointer_mask = 0xffff'ffff'ffff & ~((s_hashtable_size - 1));
|
||||
|
||||
// Pointer mask without bits used as hash, assuming signed 48-bit pointers
|
||||
static constexpr u64 s_pointer_mask = 0xffff'ffff'ffff & ~((s_hashtable_size - 1) << 2);
|
||||
// Max number of waiters is 32767.
|
||||
static constexpr u64 s_waiter_mask = s_hashtable_power ? 0x7fff'0000'0000'0000 : 0x7f00'0000'0000'0000;
|
||||
|
||||
// Max number of waiters is 32767
|
||||
static constexpr u64 s_waiter_mask = 0x7fff'0000'0000'0000;
|
||||
|
||||
//
|
||||
// Bit indicates that more than one.
|
||||
static constexpr u64 s_collision_bit = 0x8000'0000'0000'0000;
|
||||
|
||||
#ifdef USE_FUTEX
|
||||
static constexpr u64 s_sema_mask = 0;
|
||||
#else
|
||||
// Number of search groups (defines max semaphore count as gcount * 64)
|
||||
static constexpr u32 s_sema_gcount = 64;
|
||||
// Allocated slot with secondary table.
|
||||
static constexpr u64 s_slot_mask = ~(s_waiter_mask | s_pointer_mask | s_collision_bit);
|
||||
|
||||
// Bits encoding allocated semaphore index (zero = not allocated yet)
|
||||
static constexpr u64 s_sema_mask = (64 * s_sema_gcount - 1) << 2;
|
||||
// Main hashtable for atomic wait, uses lowest pointer bits.
|
||||
static atomic_t<u64> s_hashtable[s_hashtable_size]{};
|
||||
|
||||
// Helper to get least significant set bit from 64-bit masks
|
||||
template <u64 Mask>
|
||||
static constexpr u64 one_v = Mask & (0 - Mask);
|
||||
|
||||
namespace
|
||||
{
|
||||
struct slot_info
|
||||
{
|
||||
constexpr slot_info() noexcept = default;
|
||||
|
||||
// Combined allocated semaphore id and number of waiters
|
||||
atomic_t<u64> sema_var{};
|
||||
|
||||
// Sub slots
|
||||
atomic_t<u64> branch[48 - s_hashtable_power]{};
|
||||
};
|
||||
}
|
||||
|
||||
// Number of search groups (defines max slot branch count as gcount * 64)
|
||||
static constexpr u32 s_slot_gcount = (s_hashtable_power ? 16384 : 256) / 64;
|
||||
|
||||
// Array of slot branch objects
|
||||
static slot_info s_slot_list[s_slot_gcount * 64]{};
|
||||
|
||||
// Allocation bits
|
||||
static atomic_t<u64> s_slot_bits[s_slot_gcount]{};
|
||||
|
||||
static u64 slot_alloc()
|
||||
{
|
||||
// Diversify search start points to reduce contention and increase immediate success chance
|
||||
#ifdef _WIN32
|
||||
const u32 start = GetCurrentProcessorNumber();
|
||||
#elif __linux__
|
||||
const u32 start = sched_getcpu();
|
||||
#else
|
||||
const u32 start = __rdtsc();
|
||||
#endif
|
||||
|
||||
// Implementation detail (remaining bits out of 32 available for futex)
|
||||
static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask | s_collision_bit | s_sema_mask);
|
||||
for (u32 i = 0;; i++)
|
||||
{
|
||||
const u32 group = (i + start) % s_slot_gcount;
|
||||
|
||||
// Callback for wait() function, returns false if wait should return
|
||||
static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*)
|
||||
const auto [bits, ok] = s_slot_bits[group].fetch_op([](u64& bits)
|
||||
{
|
||||
if (~bits)
|
||||
{
|
||||
// Set lowest clear bit
|
||||
bits |= bits + 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
|
||||
if (ok)
|
||||
{
|
||||
// Find lowest clear bit
|
||||
return group * 64 + utils::cnttz64(~bits, false);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle it somehow
|
||||
std::printf("slot overflow\n");
|
||||
std::abort();
|
||||
return 0;
|
||||
}
|
||||
|
||||
static slot_info* slot_get(std::uintptr_t iptr, atomic_t<u64>* loc, u64 lv = 0)
|
||||
{
|
||||
return true;
|
||||
};
|
||||
if (!loc)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
const u64 value = loc->load();
|
||||
|
||||
if (!value)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if ((value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
return &s_slot_list[(value & s_slot_mask) / one_v<s_slot_mask>];
|
||||
}
|
||||
|
||||
if ((value & s_collision_bit) == 0)
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Get the number of leading equal bits to determine subslot
|
||||
const u64 eq_bits = utils::cntlz64((((iptr ^ value) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16, true);
|
||||
|
||||
// Proceed recursively, increment level
|
||||
return slot_get(iptr, s_slot_list[(value & s_slot_mask) / one_v<s_slot_mask>].branch + eq_bits, eq_bits + 1);
|
||||
}
|
||||
|
||||
static void slot_free(u64 id)
|
||||
{
|
||||
// Reset allocation bit
|
||||
id = (id & s_slot_mask) / one_v<s_slot_mask>;
|
||||
s_slot_bits[id / 64] &= ~(1ull << (id % 64));
|
||||
}
|
||||
|
||||
// Number of search groups (defines max semaphore count as gcount * 64)
|
||||
static constexpr u32 s_sema_gcount = 128;
|
||||
|
||||
static constexpr u64 s_sema_mask = (s_sema_gcount * 64 - 1);
|
||||
|
||||
#ifdef USE_POSIX
|
||||
using sema_handle = sem_t;
|
||||
@ -183,8 +279,6 @@ static bool sema_get(u32 id)
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u64 mask)
|
||||
{
|
||||
switch (size)
|
||||
@ -198,112 +292,11 @@ static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u6
|
||||
return false;
|
||||
}
|
||||
|
||||
// Fallback implementation
|
||||
namespace
|
||||
// Callback for wait() function, returns false if wait should return
|
||||
static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*)
|
||||
{
|
||||
struct waiter
|
||||
{
|
||||
std::condition_variable cond;
|
||||
void* const tls_ptr;
|
||||
|
||||
explicit waiter(void* tls_ptr)
|
||||
: tls_ptr(tls_ptr)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
struct waiter_map
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::multimap<const void*, waiter> list;
|
||||
};
|
||||
|
||||
// Thread's unique node to insert without allocation
|
||||
thread_local std::multimap<const void*, waiter>::node_type s_tls_waiter = []()
|
||||
{
|
||||
// Initialize node from a dummy container (there is no separate node constructor)
|
||||
std::multimap<const void*, waiter> dummy;
|
||||
return dummy.extract(dummy.emplace(nullptr, &s_tls_waiter));
|
||||
}();
|
||||
|
||||
waiter_map& get_fallback_map(const void* ptr)
|
||||
{
|
||||
static waiter_map s_waiter_maps[4096];
|
||||
|
||||
return s_waiter_maps[std::hash<const void*>()(ptr) % std::size(s_waiter_maps)];
|
||||
}
|
||||
|
||||
void fallback_wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask)
|
||||
{
|
||||
auto& wmap = get_fallback_map(data);
|
||||
|
||||
if (!timeout)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Update node key
|
||||
s_tls_waiter.key() = data;
|
||||
|
||||
if (std::unique_lock lock(wmap.mutex); ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data))
|
||||
{
|
||||
// Add node to the waiter list
|
||||
const auto iter = wmap.list.insert(std::move(s_tls_waiter));
|
||||
|
||||
// Wait until the node is returned to its TLS location
|
||||
if (timeout + 1)
|
||||
{
|
||||
if (!iter->second.cond.wait_for(lock, std::chrono::nanoseconds(timeout), [&]
|
||||
{
|
||||
return 1 && s_tls_waiter;
|
||||
}))
|
||||
{
|
||||
// Put it back
|
||||
s_tls_waiter = wmap.list.extract(iter);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
while (!s_tls_waiter)
|
||||
{
|
||||
iter->second.cond.wait(lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fallback_notify(waiter_map& wmap, std::multimap<const void*, waiter>::iterator found)
|
||||
{
|
||||
// Return notified node to its TLS location
|
||||
const auto ptls = static_cast<std::multimap<const void*, waiter>::node_type*>(found->second.tls_ptr);
|
||||
*ptls = wmap.list.extract(found);
|
||||
ptls->mapped().cond.notify_one();
|
||||
}
|
||||
|
||||
void fallback_notify_one(const void* data)
|
||||
{
|
||||
auto& wmap = get_fallback_map(data);
|
||||
|
||||
std::lock_guard lock(wmap.mutex);
|
||||
|
||||
if (auto found = wmap.list.find(data); found != wmap.list.end())
|
||||
{
|
||||
fallback_notify(wmap, found);
|
||||
}
|
||||
}
|
||||
|
||||
void fallback_notify_all(const void* data)
|
||||
{
|
||||
auto& wmap = get_fallback_map(data);
|
||||
|
||||
std::lock_guard lock(wmap.mutex);
|
||||
|
||||
for (auto it = wmap.list.lower_bound(data); it != wmap.list.end() && it->first == data;)
|
||||
{
|
||||
fallback_notify(wmap, it++);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_value, u64 timeout, u64 mask)
|
||||
{
|
||||
@ -314,52 +307,102 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
|
||||
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
|
||||
|
||||
atomic_t<u64>& entry = s_hashtable[(iptr >> 2) % s_hashtable_size];
|
||||
// Allocated slot index
|
||||
u64 slot_a = -1;
|
||||
|
||||
u32 new_value = 0;
|
||||
// Found slot object
|
||||
slot_info* slot = nullptr;
|
||||
|
||||
bool fallback = false;
|
||||
|
||||
u32 sema_id = -1;
|
||||
|
||||
const auto [_, ok] = entry.fetch_op([&](u64& value)
|
||||
auto install_op = [&](u64& value) -> u64
|
||||
{
|
||||
if ((value & s_waiter_mask) == s_waiter_mask || (value & s_signal_mask) == s_signal_mask)
|
||||
if ((value & s_waiter_mask) == s_waiter_mask)
|
||||
{
|
||||
// Return immediately on waiter overflow or signal overflow
|
||||
return false;
|
||||
// Return immediately on waiter overflow
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
sema_id = (value & s_sema_mask) >> 2;
|
||||
#endif
|
||||
|
||||
if (!value || (value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
if (!value)
|
||||
{
|
||||
if (slot_a + 1 == 0)
|
||||
{
|
||||
// First waiter: allocate slot and install it
|
||||
slot_a = slot_alloc() * one_v<s_slot_mask>;
|
||||
}
|
||||
|
||||
value |= slot_a;
|
||||
}
|
||||
|
||||
// Store pointer bits
|
||||
value |= (iptr & s_pointer_mask);
|
||||
fallback = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Set collision bit
|
||||
value |= s_collision_bit;
|
||||
fallback = true;
|
||||
}
|
||||
|
||||
// Add waiter
|
||||
value += s_waiter_mask & -s_waiter_mask;
|
||||
new_value = static_cast<u32>(value);
|
||||
return true;
|
||||
});
|
||||
// Return slot ptr
|
||||
slot = &s_slot_list[(value & s_slot_mask) / one_v<s_slot_mask>];
|
||||
|
||||
if (!ok)
|
||||
// Add waiter
|
||||
value += one_v<s_waiter_mask>;
|
||||
return value;
|
||||
};
|
||||
|
||||
// Search detail
|
||||
u64 lv = 0;
|
||||
|
||||
// For cleanup
|
||||
std::basic_string<atomic_t<u64>*> install_list;
|
||||
|
||||
for (atomic_t<u64>* ptr = &s_hashtable[iptr % s_hashtable_size];;)
|
||||
{
|
||||
return;
|
||||
auto [_old, ok] = ptr->fetch_op(install_op);
|
||||
|
||||
if (slot_a + 1)
|
||||
{
|
||||
if ((ok & s_slot_mask) == slot_a)
|
||||
{
|
||||
// Slot set successfully
|
||||
slot_a = -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
// Expected only on top level
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_old || (_old & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
// Success
|
||||
if (slot_a + 1)
|
||||
{
|
||||
// Cleanup slot if unused
|
||||
slot_free(slot_a);
|
||||
slot_a = -1;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Get the number of leading equal bits (between iptr and slot owner)
|
||||
const u64 eq_bits = utils::cntlz64((((iptr ^ ok) & (s_pointer_mask >> lv)) | ~s_pointer_mask) << 16, true);
|
||||
|
||||
// Collision; need to go deeper
|
||||
ptr = slot->branch + eq_bits;
|
||||
install_list.push_back(ptr);
|
||||
|
||||
lv = eq_bits + 1;
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
for (u32 loop_count = 0; !fallback && loop_count < 7; loop_count++)
|
||||
// Now try to reference a semaphore (allocate it if needed)
|
||||
u32 sema_id = static_cast<u32>(slot->sema_var & s_sema_mask);
|
||||
|
||||
for (u32 loop_count = 0; loop_count < 7; loop_count++)
|
||||
{
|
||||
// Try to allocate a semaphore
|
||||
if (!sema_id)
|
||||
@ -371,16 +414,16 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
break;
|
||||
}
|
||||
|
||||
sema_id = entry.atomic_op([&](u64& value) -> u32
|
||||
sema_id = slot->sema_var.atomic_op([&](u64& value) -> u32
|
||||
{
|
||||
if (value & s_sema_mask)
|
||||
{
|
||||
return (value & s_sema_mask) >> 2;
|
||||
return static_cast<u32>(value & s_sema_mask);
|
||||
}
|
||||
|
||||
// Insert allocated semaphore
|
||||
value += s_signal_mask & -s_signal_mask;
|
||||
value |= (u64{sema} << 2);
|
||||
value += s_sema_mask + 1;
|
||||
value |= sema;
|
||||
return 0;
|
||||
});
|
||||
|
||||
@ -403,47 +446,45 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
}
|
||||
|
||||
// Try to increment sig (check semaphore validity)
|
||||
const auto [_old, ok] = entry.fetch_op([&](u64& value)
|
||||
const auto [_old, _new] = slot->sema_var.fetch_op([&](u64& value) -> u64
|
||||
{
|
||||
if ((value & s_signal_mask) == s_signal_mask)
|
||||
if ((value & ~s_sema_mask) == ~s_sema_mask)
|
||||
{
|
||||
return false;
|
||||
// Signal overflow
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ((value & s_sema_mask) >> 2 != sema_id)
|
||||
if ((value & s_sema_mask) != sema_id)
|
||||
{
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
value += s_signal_mask & -s_signal_mask;
|
||||
return true;
|
||||
value += s_sema_mask + 1;
|
||||
return value;
|
||||
});
|
||||
|
||||
if (!ok)
|
||||
if (!_new)
|
||||
{
|
||||
sema_free(sema_id);
|
||||
sema_id = 0;
|
||||
|
||||
if ((_old & s_signal_mask) == s_signal_mask)
|
||||
if ((_old & ~s_sema_mask) == ~s_sema_mask)
|
||||
{
|
||||
// Break on signal overflow
|
||||
sema_id = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
sema_id = _new & s_sema_mask;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (fallback)
|
||||
bool fallback = false;
|
||||
|
||||
if (sema_id && ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data))
|
||||
{
|
||||
fallback_wait(data, size, old_value, timeout, mask);
|
||||
}
|
||||
else if (sema_id && ptr_cmp(data, size, old_value, mask) && s_tls_wait_cb(data))
|
||||
{
|
||||
#ifndef USE_FUTEX
|
||||
#if defined(_WIN32) && !defined(USE_POSIX)
|
||||
LARGE_INTEGER qw;
|
||||
qw.QuadPart = -static_cast<s64>(timeout / 100);
|
||||
@ -506,13 +547,6 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
sema.count--;
|
||||
fallback = true;
|
||||
}
|
||||
#endif
|
||||
#else
|
||||
struct timespec ts;
|
||||
ts.tv_sec = timeout / 1'000'000'000;
|
||||
ts.tv_nsec = timeout % 1'000'000'000;
|
||||
|
||||
futex(reinterpret_cast<char*>(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAIT_PRIVATE, new_value, timeout + 1 ? &ts : nullptr);
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -524,35 +558,27 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
while (true)
|
||||
{
|
||||
// Try to decrement
|
||||
const auto [prev, ok] = entry.fetch_op([&](u64& value)
|
||||
const auto [prev, ok] = slot->sema_var.fetch_op([&](u64& value)
|
||||
{
|
||||
if (value & s_waiter_mask)
|
||||
if (value)
|
||||
{
|
||||
#ifndef USE_FUTEX
|
||||
// If timeout
|
||||
if (!fallback)
|
||||
{
|
||||
if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id)
|
||||
if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id)
|
||||
{
|
||||
// Give up if signaled or semaphore has already changed
|
||||
return false;
|
||||
}
|
||||
|
||||
value -= s_signal_mask & -s_signal_mask;
|
||||
value -= s_sema_mask + 1;
|
||||
|
||||
if ((value & s_signal_mask) == 0)
|
||||
if ((value & ~s_sema_mask) == 0)
|
||||
{
|
||||
value &= ~s_sema_mask;
|
||||
// Remove allocated sema on last waiter
|
||||
value = 0;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
value -= s_waiter_mask & -s_waiter_mask;
|
||||
|
||||
if ((value & s_waiter_mask) == 0)
|
||||
{
|
||||
// Reset on last waiter
|
||||
value = 0;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -565,7 +591,6 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
break;
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
#if defined(_WIN32) && !defined(USE_POSIX)
|
||||
static LARGE_INTEGER instant{};
|
||||
|
||||
@ -588,104 +613,57 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
|
||||
sema.count--;
|
||||
fallback = true;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
if (sema_id)
|
||||
{
|
||||
sema_free(sema_id);
|
||||
}
|
||||
#endif
|
||||
|
||||
for (auto ptr = (install_list.empty() ? &s_hashtable[iptr % s_hashtable_size] : install_list.back());;)
|
||||
{
|
||||
auto [_old, ok] = ptr->fetch_op([&](u64& value)
|
||||
{
|
||||
if (value & s_waiter_mask)
|
||||
{
|
||||
value -= one_v<s_waiter_mask>;
|
||||
|
||||
if (!(value & s_waiter_mask))
|
||||
{
|
||||
// Deallocate slot on last waiter
|
||||
value = 0;
|
||||
return 2;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
if (!ok)
|
||||
{
|
||||
std::abort();
|
||||
}
|
||||
|
||||
if (ok > 1)
|
||||
{
|
||||
slot_free(_old);
|
||||
}
|
||||
|
||||
if (ptr == &s_hashtable[iptr % s_hashtable_size])
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
install_list.pop_back();
|
||||
ptr = install_list.empty() ? &s_hashtable[iptr % s_hashtable_size] : install_list.back();
|
||||
}
|
||||
|
||||
s_tls_wait_cb(nullptr);
|
||||
}
|
||||
|
||||
#ifdef USE_FUTEX
|
||||
|
||||
void atomic_storage_futex::notify_one(const void* data)
|
||||
{
|
||||
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
|
||||
|
||||
atomic_t<u64>& entry = s_hashtable[(iptr >> 2) % s_hashtable_size];
|
||||
|
||||
const auto [prev, ok] = entry.fetch_op([&](u64& value)
|
||||
{
|
||||
if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
if ((value & s_signal_mask) == s_signal_mask)
|
||||
{
|
||||
// Signal overflow, do nothing
|
||||
return false;
|
||||
}
|
||||
|
||||
value += s_signal_mask & -s_signal_mask;
|
||||
|
||||
if ((value & s_signal_mask) == s_signal_mask)
|
||||
{
|
||||
// Signal will overflow, fallback to notify_all
|
||||
notify_all(data);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
else if (value & s_waiter_mask && value & s_collision_bit)
|
||||
{
|
||||
fallback_notify_one(data);
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
|
||||
if (ok)
|
||||
{
|
||||
futex(reinterpret_cast<char*>(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 1);
|
||||
}
|
||||
}
|
||||
|
||||
void atomic_storage_futex::notify_all(const void* data)
|
||||
{
|
||||
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
|
||||
|
||||
atomic_t<u64>& entry = s_hashtable[(iptr >> 2) % s_hashtable_size];
|
||||
|
||||
const auto [_, ok] = entry.fetch_op([&](u64& value)
|
||||
{
|
||||
if (value & s_waiter_mask)
|
||||
{
|
||||
if ((value & s_signal_mask) == s_signal_mask)
|
||||
{
|
||||
// Signal overflow, do nothing
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
value += s_signal_mask & -s_signal_mask;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (value & s_collision_bit)
|
||||
{
|
||||
fallback_notify_all(data);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
|
||||
if (ok)
|
||||
{
|
||||
futex(reinterpret_cast<char*>(&entry) + 4 * IS_BE_MACHINE, FUTEX_WAKE_PRIVATE, 0x7fffffff);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
|
||||
{
|
||||
if (cb)
|
||||
@ -702,59 +680,44 @@ void atomic_storage_futex::raw_notify(const void* data)
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef USE_FUTEX
|
||||
|
||||
void atomic_storage_futex::notify_one(const void* data)
|
||||
{
|
||||
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
|
||||
|
||||
atomic_t<u64>& entry = s_hashtable[(iptr >> 2) % s_hashtable_size];
|
||||
const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]);
|
||||
|
||||
const u64 value = entry;
|
||||
|
||||
if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0)
|
||||
{
|
||||
// No relevant waiters, do nothing
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (value & s_waiter_mask && value & s_collision_bit)
|
||||
{
|
||||
fallback_notify_one(data);
|
||||
return;
|
||||
}
|
||||
else
|
||||
if (!slot)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const u32 sema_id = (value & s_sema_mask) >> 2;
|
||||
const u64 value = slot->sema_var;
|
||||
|
||||
if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const u32 sema_id = static_cast<u32>(value & s_sema_mask);
|
||||
|
||||
if (!sema_get(sema_id))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const auto [_, ok] = entry.fetch_op([&](u64& value)
|
||||
const auto [_, ok] = slot->sema_var.fetch_op([&](u64& value)
|
||||
{
|
||||
if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask))
|
||||
if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
value -= s_signal_mask & -s_signal_mask;
|
||||
value -= s_sema_mask + 1;
|
||||
|
||||
// Reset allocated semaphore on last waiter
|
||||
if ((value & s_signal_mask) == 0)
|
||||
if ((value & ~s_sema_mask) == 0)
|
||||
{
|
||||
value &= ~s_sema_mask;
|
||||
value = 0;
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -783,51 +746,35 @@ void atomic_storage_futex::notify_all(const void* data)
|
||||
{
|
||||
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
|
||||
|
||||
atomic_t<u64>& entry = s_hashtable[(iptr >> 2) % s_hashtable_size];
|
||||
const auto slot = slot_get(iptr, &s_hashtable[(iptr) % s_hashtable_size]);
|
||||
|
||||
const u64 value = entry;
|
||||
|
||||
if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask))
|
||||
{
|
||||
if ((value & s_signal_mask) == 0 || (value & s_sema_mask) == 0)
|
||||
{
|
||||
// No relevant waiters, do nothing
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (value & s_waiter_mask && value & s_collision_bit)
|
||||
{
|
||||
fallback_notify_all(data);
|
||||
return;
|
||||
}
|
||||
else
|
||||
if (!slot)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const u32 sema_id = (value & s_sema_mask) >> 2;
|
||||
const u64 value = slot->sema_var;
|
||||
|
||||
if ((value & ~s_sema_mask) == 0 || !(value & s_sema_mask))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const u32 sema_id = static_cast<u32>(value & s_sema_mask);
|
||||
|
||||
if (!sema_get(sema_id))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
const auto [_, count] = entry.fetch_op([&](u64& value) -> u32
|
||||
const auto [_, count] = slot->sema_var.fetch_op([&](u64& value) -> u32
|
||||
{
|
||||
if ((value & s_waiter_mask) == 0 || (value & s_pointer_mask) != (iptr & s_pointer_mask))
|
||||
if ((value & ~s_sema_mask) == 0 || (value & s_sema_mask) != sema_id)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ((value & s_signal_mask) == 0 || (value & s_sema_mask) >> 2 != sema_id)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
const u32 r = (value & s_signal_mask) / (s_signal_mask & -s_signal_mask);
|
||||
value &= ~s_sema_mask;
|
||||
value &= ~s_signal_mask;
|
||||
return r;
|
||||
return (std::exchange(value, 0) & ~s_sema_mask) / (s_sema_mask + 1);
|
||||
});
|
||||
|
||||
#ifdef USE_POSIX
|
||||
@ -854,5 +801,3 @@ void atomic_storage_futex::notify_all(const void* data)
|
||||
|
||||
sema_free(sema_id);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user