atomic.hpp: implement collision fallback properly

Should prevent devastating effect of collisions
This commit is contained in:
Nekotekina 2019-09-09 12:28:21 +03:00
parent 0a96497e13
commit 4504ac2d12

View File

@ -15,11 +15,14 @@ static atomic_t<u64> s_hashtable[s_hashtable_size];
// Pointer mask without bits used as hash, assuming signed 48-bit pointers
static constexpr u64 s_pointer_mask = 0xffff'ffff'ffff & ~(s_hashtable_size - 1);
// Max number of waiters is 65535
static constexpr u64 s_waiter_mask = 0xffff'0000'0000'0000;
// Max number of waiters is 32767
static constexpr u64 s_waiter_mask = 0x7fff'0000'0000'0000;
//
static constexpr u64 s_collision_bit = 0x8000'0000'0000'0000;
// Implementation detail (remaining bits out of 32 available for futex)
static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask);
static constexpr u64 s_signal_mask = 0xffffffff & ~(s_waiter_mask | s_pointer_mask | s_collision_bit);
// Callback for wait() function, returns false if wait should return
static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*)
@ -179,6 +182,8 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
u32 new_value = 0;
bool fallback = false;
const auto [_, ok] = entry.fetch_op([&](u64& value)
{
if ((value & s_waiter_mask) == s_waiter_mask || (value & s_signal_mask) == s_signal_mask)
@ -191,11 +196,17 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
{
// Store pointer bits
value |= (iptr & s_pointer_mask);
fallback = false;
#ifdef _WIN32
value += s_signal_mask & -s_signal_mask;
#endif
}
else
{
// Set pointer bits to all ones (collision, TODO)
value |= s_pointer_mask;
// Set collision bit
value |= s_collision_bit;
fallback = true;
}
// Add waiter
@ -209,7 +220,11 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
return;
}
if (ptr_cmp(data, size, old_value) && s_tls_wait_cb(data))
if (fallback)
{
fallback_wait(data, size, old_value, timeout);
}
else if (ptr_cmp(data, size, old_value) && s_tls_wait_cb(data))
{
#ifdef _WIN32
LARGE_INTEGER qw;
@ -245,6 +260,13 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
{
value -= s_waiter_mask & -s_waiter_mask;
#ifdef _WIN32
if (!fallback)
{
value -= s_signal_mask & -s_signal_mask;
}
#endif
if ((value & s_waiter_mask) == 0)
{
// Reset on last waiter
@ -257,7 +279,7 @@ void atomic_storage_futex::wait(const void* data, std::size_t size, u64 old_valu
return false;
});
if (ok)
if (ok || fallback)
{
break;
}
@ -284,15 +306,20 @@ void atomic_storage_futex::notify_one(const void* data)
atomic_t<u64>& entry = s_hashtable[iptr % s_hashtable_size];
bool fallback = false;
const auto [prev, ok] = entry.fetch_op([&](u64& value)
{
if (value & s_waiter_mask && (value & s_pointer_mask) == (iptr & s_pointer_mask))
{
#ifdef _WIN32
// Try to decrement if no collision
if ((value & s_signal_mask) == 0)
{
// No relevant waiters, do nothing
return false;
}
// Try to decrement if possible
value -= s_waiter_mask & -s_waiter_mask;
value -= s_signal_mask & -s_signal_mask;
if ((value & s_waiter_mask) == 0)
{
@ -310,30 +337,22 @@ void atomic_storage_futex::notify_one(const void* data)
if ((value & s_signal_mask) == s_signal_mask)
{
// Signal will overflow, fallback
fallback = true;
// Signal will overflow, fallback to notify_all
notify_all(data);
return false;
}
#endif
return true;
}
if (value & s_waiter_mask && (value & s_pointer_mask) == s_pointer_mask)
else if (value & s_waiter_mask && value & s_collision_bit)
{
// Collision, notify everything
fallback = true;
fallback_notify_one(data);
return false;
}
return false;
});
if (fallback)
{
notify_all(data);
return;
}
if (ok)
{
#ifdef _WIN32
@ -356,11 +375,32 @@ void atomic_storage_futex::notify_all(const void* data)
{
if (value & s_waiter_mask)
{
if ((value & s_pointer_mask) == s_pointer_mask || (value & s_pointer_mask) == (iptr & s_pointer_mask))
if ((value & s_pointer_mask) == (iptr & s_pointer_mask))
{
value = 0;
if ((value & s_signal_mask) == 0)
{
// No relevant waiters, do nothing
return false;
}
const u64 count = (value & s_signal_mask) / (s_signal_mask & -s_signal_mask);
value -= (s_waiter_mask & -s_waiter_mask) * count;
value -= (s_signal_mask & -s_signal_mask) * count;
if ((value & s_waiter_mask) == 0)
{
// Reset on last waiter
value = 0;
}
return true;
}
if (value & s_collision_bit)
{
fallback_notify_all(data);
return false;
}
}
return false;
@ -371,7 +411,7 @@ void atomic_storage_futex::notify_all(const void* data)
return;
}
for (u64 count = old & s_waiter_mask; count; count -= s_waiter_mask & -s_waiter_mask)
for (u64 count = old & s_signal_mask; count; count -= s_signal_mask & -s_signal_mask)
{
NtReleaseKeyedEvent(nullptr, &entry, false, nullptr);
}
@ -386,11 +426,17 @@ void atomic_storage_futex::notify_all(const void* data)
return false;
}
if ((value & s_pointer_mask) == s_pointer_mask || (value & s_pointer_mask) == (iptr & s_pointer_mask))
if ((value & s_pointer_mask) == (iptr & s_pointer_mask))
{
value += s_signal_mask & -s_signal_mask;
return true;
}
if (value & s_collision_bit)
{
fallback_notify_all(data);
return false;
}
}
return false;