atomic.cpp: more improvements

Add ref counters for internal semaphores for concurrent notifying.
Lack of them could result in loss of notification.
This commit is contained in:
Nekotekina 2020-11-06 00:06:58 +03:00
parent 1bb7c74c93
commit 9fb8d449fe

View File

@ -183,14 +183,14 @@ namespace
#endif
}
// Max allowed thread number is chosen to fit in 15 bits
static std::aligned_storage_t<sizeof(cond_handle), alignof(cond_handle)> s_cond_list[INT16_MAX]{};
// Max allowed thread number is chosen to fit in 16 bits
static std::aligned_storage_t<sizeof(cond_handle), alignof(cond_handle)> s_cond_list[UINT16_MAX]{};
// Used to allow concurrent notifying
static atomic_t<u16> s_cond_refs[INT16_MAX]{};
static atomic_t<u16> s_cond_refs[UINT16_MAX + 1]{};
// Allocation bits
static atomic_t<u64, 64> s_cond_bits[::align<u32>(INT16_MAX, 64) / 64]{};
static atomic_t<u64, 64> s_cond_bits[::align<u32>(UINT16_MAX, 64) / 64]{};
// Allocation semaphore
static atomic_t<u32, 64> s_cond_sema{0};
@ -198,7 +198,7 @@ static atomic_t<u32, 64> s_cond_sema{0};
static u32 cond_alloc()
{
// Determine whether there is a free slot or not
if (!s_cond_sema.try_inc(INT16_MAX + 1))
if (!s_cond_sema.try_inc(UINT16_MAX + 1))
{
return 0;
}
@ -236,6 +236,9 @@ static u32 cond_alloc()
// Construct inplace before it can be used
new (s_cond_list + id) cond_handle();
// Add first reference
verify(HERE), !s_cond_refs[id]++;
return id + 1;
}
}
@ -247,7 +250,7 @@ static u32 cond_alloc()
static cond_handle* cond_get(u32 cond_id)
{
if (cond_id - 1 < u32{INT16_MAX}) [[likely]]
if (cond_id - 1 < u32{UINT16_MAX}) [[likely]]
{
return std::launder(reinterpret_cast<cond_handle*>(s_cond_list + (cond_id - 1)));
}
@ -257,12 +260,18 @@ static cond_handle* cond_get(u32 cond_id)
static void cond_free(u32 cond_id)
{
if (cond_id - 1 >= u32{INT16_MAX})
if (cond_id - 1 >= u32{UINT16_MAX})
{
fprintf(stderr, "cond_free(): bad id %u" HERE "\n", cond_id);
std::abort();
}
// Dereference, destroy on last ref
if (--s_cond_refs[cond_id - 1])
{
return;
}
// Call the destructor
cond_get(cond_id)->~cond_handle();
@ -273,6 +282,46 @@ static void cond_free(u32 cond_id)
s_cond_sema--;
}
static u32 cond_lock(atomic_t<u16>* sema)
{
while (const u32 cond_id = sema->load())
{
const auto [old, ok] = s_cond_refs[cond_id - 1].fetch_op([](u16& ref)
{
if (!ref || ref == UINT16_MAX)
{
// Don't reference already deallocated semaphore
return false;
}
ref++;
return true;
});
if (ok)
{
return cond_id;
}
if (old == UINT16_MAX)
{
fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " for a single address reached in atomic notifier.");
}
if (sema->load() != cond_id)
{
// Try again if it changed
continue;
}
else
{
break;
}
}
return 0;
}
namespace
{
#define MAX_THREADS (56)
@ -284,12 +333,14 @@ namespace
// Reference counter, owning pointer, collision bit and optionally selected slot
atomic_t<u64> addr_ref{};
// Semaphores (allocated in reverse order), 0 means empty, 0x8000 is notifier lock
private:
// Semaphores (allocated in reverse order), empty are zeros
atomic_t<u16> sema_data[MAX_THREADS]{};
// Allocated semaphore bits (to make total size 128)
atomic_t<u64> sema_bits{};
public:
atomic_t<u16>* sema_alloc()
{
const auto [bits, ok] = sema_bits.fetch_op([](u64& bits)
@ -307,7 +358,7 @@ namespace
if (ok) [[likely]]
{
// Find lowest clear bit
return &sema_data[std::countr_one(bits)];
return get_sema(std::countr_one(bits));
}
// TODO: support extension if reached
@ -315,6 +366,25 @@ namespace
return nullptr;
}
atomic_t<u16>* get_sema(u32 id)
{
verify(HERE), id < MAX_THREADS;
return &sema_data[(MAX_THREADS - 1) - id];
}
u64 get_sema_bits() const
{
return sema_bits & ((1ull << MAX_THREADS) - 1);
}
void reset_sema_bit(atomic_t<u16>* sema)
{
verify(HERE), sema >= sema_data && sema < std::end(sema_data);
sema_bits &= ~(1ull << ((MAX_THREADS - 1) - (sema - sema_data)));
}
void sema_free(atomic_t<u16>* sema)
{
if (sema < sema_data || sema >= std::end(sema_data))
@ -323,19 +393,11 @@ namespace
std::abort();
}
const u32 cond_id = sema->fetch_and(0x8000);
if (!cond_id || cond_id >> 15)
{
// Delegated cleanup
return;
}
// Free
cond_free(cond_id);
// Try to deallocate semaphore (may be delegated to a notifier)
cond_free(sema->exchange(0));
// Clear sema bit
sema_bits &= ~(1ull << (sema - sema_data));
reset_sema_bit(sema);
}
};
@ -607,7 +669,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti
if (cond_id == 0)
{
fmt::raw_error("Thread limit " STRINGIZE(INT16_MAX) " reached in atomic wait.");
fmt::raw_error("Thread limit " STRINGIZE(UINT16_MAX) " reached in atomic wait.");
}
auto sema = slot->sema_alloc();
@ -790,29 +852,20 @@ __vectorcall
#endif
alert_sema(atomic_t<u16>* sema, const void* data, u64 info, u32 size, __m128i mask, __m128i new_value)
{
auto [cond_id, ok] = sema->fetch_op([](u16& id)
{
// Check if not zero and not locked
if (!id || id & 0x8000)
{
return false;
}
const u32 cond_id = cond_lock(sema);
// Set notify lock
id |= 0x8000;
return true;
});
if (!ok) [[unlikely]]
if (!cond_id)
{
return false;
}
const auto cond = cond_get(cond_id);
ok = false;
verify(HERE), cond;
if (cond && cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)))
bool ok = false;
if (cond->sync && (!size ? (!info || cond->tid == info) : cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv)))
{
if ((!size && cond->forced_wakeup()) || (size && cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2)))
{
@ -841,18 +894,8 @@ alert_sema(atomic_t<u16>* sema, const void* data, u64 info, u32 size, __m128i ma
}
}
// Remove lock, check if cond_id is already removed (leaving only 0x8000)
if (sema->fetch_and(0x7fff) == 0x8000)
{
cond_free(cond_id);
// Cleanup, a little hacky obtainment of the host variable
const auto slot = std::launder(reinterpret_cast<sync_var*>(reinterpret_cast<u64>(sema) & -128));
// Remove slot bit
slot->sema_bits &= ~(1ull << (sema - slot->sema_data));
}
// Remove lock, possibly deallocate cond
cond_free(cond_id);
return ok;
}
@ -895,9 +938,9 @@ bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id)
u64 progress = 0;
for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1)
for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1)
{
const auto sema = &slot->sema_data[std::countr_zero(bits)];
const auto sema = slot->get_sema(std::countr_zero(bits));
// Forced notification
if (alert_sema(sema, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128()))
@ -937,9 +980,9 @@ atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m12
u64 progress = 0;
for (u64 bits = slot->sema_bits; bits; bits &= bits - 1)
for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1)
{
const auto sema = &slot->sema_data[std::countr_zero(bits)];
const auto sema = slot->get_sema(std::countr_zero(bits));
if (alert_sema(sema, data, progress, size, mask, new_value))
{
@ -969,14 +1012,9 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
s_tls_notify_cb(data, 0);
u64 progress = 0;
#if defined(_WIN32) && !defined(USE_FUTEX)
// Special path for Windows 7
if (!NtAlertThreadByThreadId)
#elif defined(USE_STD)
{
// Make a copy to filter out waiters that fail some checks
u64 copy = slot->sema_bits.load();
u64 copy = slot->get_sema_bits();
u64 lock = 0;
u32 lock_ids[64]{};
@ -984,20 +1022,9 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
{
const u32 id = std::countr_zero(bits);
const auto sema = &slot->sema_data[id];
const auto sema = slot->get_sema(id);
auto [cond_id, ok] = sema->fetch_op([](u16& id)
{
if (!id || id & 0x8000)
{
return false;
}
id |= 0x8000;
return true;
});
if (ok)
if (const u32 cond_id = cond_lock(sema))
{
// Add lock bit for cleanup
lock |= 1ull << id;
@ -1005,10 +1032,13 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
const auto cond = cond_get(cond_id);
if (cond && cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))
verify(HERE), cond;
if (cond->sync && cond->ptr == data && cmp_mask(size, mask, new_value, cond->size, cond->mask, cond->oldv))
{
if (cond->sync.load() == 1 && cond->sync.compare_and_swap_test(1, 2))
{
// Ok.
continue;
}
}
@ -1025,11 +1055,12 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
{
const u32 id = std::countr_zero(bits);
const auto sema = &slot->sema_data[id];
#ifdef USE_STD
const auto sema = slot->get_sema(id);
const auto cond = cond_get(lock_ids[id]);
#if defined(USE_FUTEX)
continue;
#elif defined(USE_STD)
// Optimistic non-blocking path
if (cond->mtx.try_lock())
{
@ -1041,13 +1072,19 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
continue;
}
#elif defined(_WIN32)
if (NtAlertThreadByThreadId)
{
continue;
}
static LARGE_INTEGER instant{};
if (NtReleaseKeyedEvent(nullptr, sema, 1, &instant) != NTSTATUS_SUCCESS)
{
// Failed to notify immediately
continue;
}
#endif
s_tls_notify_cb(data, ++progress);
// Remove the bit from next stage
@ -1058,13 +1095,27 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
// Proceed with remaining bits using "normal" blocking waiting
for (u64 bits = copy; bits; bits &= bits - 1)
{
#ifdef USE_STD
const auto cond = cond_get(lock_ids[std::countr_zero(bits)]);
const u32 id = std::countr_zero(bits);
const auto sema = slot->get_sema(id);
const auto cond = cond_get(lock_ids[id]);
#if defined(USE_FUTEX)
// Always alerted (result isn't meaningful here)
futex(&cond->sync, FUTEX_WAKE_PRIVATE, 0x7fff'ffff);
#elif defined(USE_STD)
cond->mtx.lock();
cond->mtx.unlock();
cond->cond.notify_all();
#elif defined(_WIN32)
NtReleaseKeyedEvent(nullptr, sema, 1, nullptr);
if (NtAlertThreadByThreadId)
{
NtAlertThreadByThreadId(cond->tid);
}
else
{
NtReleaseKeyedEvent(nullptr, sema, 1, nullptr);
}
#endif
s_tls_notify_cb(data, ++progress);
}
@ -1072,26 +1123,17 @@ atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m12
// Cleanup locked notifiers
for (u64 bits = lock; bits; bits &= bits - 1)
{
const u32 id = std::countr_zero(bits);
const auto sema = &slot->sema_data[id];
if (sema->fetch_and(0x7fff) == 0x8000)
{
cond_free(lock_ids[id]);
slot->sema_bits &= ~(1ull << id);
}
cond_free(lock_ids[std::countr_zero(bits)]);
}
s_tls_notify_cb(data, -1);
return;
}
#endif
for (u64 bits = slot->sema_bits.load(); bits; bits &= bits - 1)
// Unused, let's keep for reference
for (u64 bits = slot->get_sema_bits(); bits; bits &= bits - 1)
{
const auto sema = &slot->sema_data[std::countr_zero(bits)];
const auto sema = slot->get_sema(std::countr_zero(bits));
if (alert_sema(sema, data, progress, size, mask, new_value))
{