atomic.cpp: combine two atomics (ref ctr and wait ptr)

Should fit into cache line perfectly.
Also makes ptr check more robust.
This commit is contained in:
Nekotekina 2020-11-12 00:29:28 +03:00
parent ad013d59f4
commit 350b704cd7

View File

@ -25,6 +25,9 @@
// Total number of entries, should be a power of 2.
static constexpr std::size_t s_hashtable_size = 1u << 18;
// Reference counter combined with shifted pointer (which is assumed to be 47 bit)
static constexpr std::uintptr_t s_ref_mask = (1u << 17) - 1;
// Callback for wait() function, returns false if wait should return
static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*){ return true; };
@ -341,17 +344,16 @@ namespace atomic_wait
{
constexpr cond_handle() noexcept = default;
atomic_t<const void*> ptr{};
// Combined pointer (most significant 47 bits) and ref counter (17 least significant bits)
atomic_t<u64> ptr_ref{};
u64 tid{};
__m128i mask{};
__m128i oldv{};
// Temporarily reduced unique tsc stamp to 48 bits to make space for refs (TODO)
u64 tsc0 : 48 = 0;
u64 link : 16 = 0;
u64 tsc0{};
u16 link{};
u8 size{};
u8 flag{};
atomic_t<u16> refs{};
atomic_t<u32> sync{};
#ifdef USE_STD
@ -360,9 +362,8 @@ namespace atomic_wait
un_t<std::mutex> mtx{};
#endif
void init(const void* data)
void init(std::uintptr_t iptr)
{
ptr.release(data);
#ifdef _WIN32
tid = GetCurrentThreadId();
#else
@ -374,13 +375,11 @@ namespace atomic_wait
mtx.init(mtx);
#endif
verify(HERE), !refs;
refs.release(1);
verify(HERE), !ptr_ref.exchange((iptr << 17) | 1);
}
void destroy()
{
ptr.release(nullptr);
tid = 0;
tsc0 = 0;
link = 0;
@ -533,7 +532,7 @@ static u32
#ifdef _WIN32
__vectorcall
#endif
cond_alloc(const void* data, __m128i mask)
cond_alloc(std::uintptr_t iptr, __m128i mask)
{
// Determine whether there is a free slot or not
if (!s_cond_sema.try_inc(UINT16_MAX + 1))
@ -581,7 +580,7 @@ cond_alloc(const void* data, __m128i mask)
// Initialize new "semaphore"
s_cond_list[id].mask = mask;
s_cond_list[id].init(data);
s_cond_list[id].init(iptr);
return id;
}
@ -603,7 +602,22 @@ static void cond_free(u32 cond_id)
const auto cond = s_cond_list + cond_id;
// Dereference, destroy on last ref
if (--cond->refs)
const bool last = cond->ptr_ref.atomic_op([](u64& val)
{
verify(HERE), val & s_ref_mask;
val--;
if ((val & s_ref_mask) == 0)
{
val = 0;
return true;
}
return false;
});
if (!last)
{
return;
}
@ -622,20 +636,26 @@ static atomic_wait::cond_handle*
#ifdef _WIN32
__vectorcall
#endif
cond_id_lock(u32 cond_id, __m128i mask, u64 thread_id = 0, const void* data = nullptr)
cond_id_lock(u32 cond_id, __m128i mask, u64 thread_id = 0, std::uintptr_t iptr = 0)
{
if (cond_id - 1 < u32{UINT16_MAX})
{
const auto cond = s_cond_list + cond_id;
const auto [old, ok] = cond->refs.fetch_op([&](u16& ref)
const auto [old, ok] = cond->ptr_ref.fetch_op([&](u64& val)
{
if (!ref || ref == UINT16_MAX)
if (!val || (val & s_ref_mask) == s_ref_mask)
{
// Don't reference already deallocated semaphore
return false;
}
if (iptr && (val >> 17) != iptr)
{
// Pointer mismatch
return false;
}
const u32 sync_val = cond->sync;
if (sync_val == 0 || sync_val == 3)
@ -653,20 +673,12 @@ cond_id_lock(u32 cond_id, __m128i mask, u64 thread_id = 0, const void* data = nu
}
}
if (data)
{
if (cond->ptr != data)
{
return false;
}
}
if (_mm_cvtsi128_si64(_mm_packs_epi16(mask12, mask12)) == 0)
{
return false;
}
ref++;
val++;
return true;
});
@ -675,9 +687,9 @@ cond_id_lock(u32 cond_id, __m128i mask, u64 thread_id = 0, const void* data = nu
return cond;
}
if (old == UINT16_MAX)
if ((old & s_ref_mask) == s_ref_mask)
{
fmt::raw_error("Reference count limit " STRINGIZE(UINT16_MAX) " reached in an atomic notifier.");
fmt::raw_error("Reference count limit (131071) reached in an atomic notifier.");
}
}
@ -716,7 +728,7 @@ namespace atomic_wait
root_info* slot_free(atomic_t<u16>* slot) noexcept;
template <typename F>
auto slot_search(const void* data, u64 thread_id, __m128i mask, F func) noexcept;
auto slot_search(std::uintptr_t iptr, u64 thread_id, __m128i mask, F func) noexcept;
};
static_assert(sizeof(root_info) == 128);
@ -913,7 +925,7 @@ atomic_wait::root_info* atomic_wait::root_info::slot_free(atomic_t<u16>* slot) n
}
template <typename F>
FORCE_INLINE auto atomic_wait::root_info::slot_search(const void* data, u64 thread_id, __m128i mask, F func) noexcept
FORCE_INLINE auto atomic_wait::root_info::slot_search(std::uintptr_t iptr, u64 thread_id, __m128i mask, F func) noexcept
{
const u64 bits_val = this->bits.load();
const u64 max_order = bits_val >> max_threads;
@ -940,7 +952,7 @@ FORCE_INLINE auto atomic_wait::root_info::slot_search(const void* data, u64 thre
for (u32 i = 0; i < cond_max; i++)
{
if (cond_id_lock(cond_ids[i], mask, thread_id, data))
if (cond_id_lock(cond_ids[i], mask, thread_id, iptr))
{
if (func(cond_ids[i]))
{
@ -974,7 +986,7 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
{
const auto stamp0 = atomic_wait::get_unique_tsc();
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data) & (~s_ref_mask >> 17);
const auto root = &s_hashtable[iptr % s_hashtable_size];
@ -1001,19 +1013,19 @@ atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 time
}
}
iptr_ext[ext_size] = reinterpret_cast<std::uintptr_t>(e->data);
iptr_ext[ext_size] = reinterpret_cast<std::uintptr_t>(e->data) & (~s_ref_mask >> 17);
root_ext[ext_size] = &s_hashtable[iptr & s_hashtable_size];
ext_size++;
}
}
const u32 cond_id = cond_alloc(data, mask);
const u32 cond_id = cond_alloc(iptr, mask);
u32 cond_id_ext[atomic_wait::max_list - 1]{};
for (u32 i = 0; i < ext_size; i++)
{
cond_id_ext[i] = cond_alloc(ext[i].data, ext[i].mask);
cond_id_ext[i] = cond_alloc(iptr_ext[i], ext[i].mask);
}
const auto slot = root->slot_alloc(iptr);
@ -1230,7 +1242,7 @@ alert_sema(u32 cond_id, const void* data, u64 tid, u32 size, __m128i mask, __m12
u32 ok = 0;
if (!size ? (!tid || cond->tid == tid) : (cond->ptr == data && cmp_mask(size, mask, new_value, cond->size | (cond->flag << 8), cond->mask, cond->oldv)))
if (!size ? (!tid || cond->tid == tid) : cmp_mask(size, mask, new_value, cond->size | (cond->flag << 8), cond->mask, cond->oldv))
{
// Redirect if necessary
const auto _old = cond;
@ -1313,17 +1325,17 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
{
const auto cond = s_cond_list + i;
const auto [old, ok] = cond->refs.fetch_op([&](u16& ref)
const auto [old, ok] = cond->ptr_ref.fetch_op([&](u64& val)
{
if (!ref)
if (!val)
{
// Skip dead semaphores
return false;
}
u32 val = cond->sync.load();
u32 sync_val = cond->sync.load();
if (val == 0 || val >= 3)
if (sync_val == 0 || sync_val >= 3)
{
// Skip forced signaled or secondary semaphores
return false;
@ -1338,9 +1350,9 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
}
}
if (ref < UINT16_MAX)
if ((val & s_ref_mask) < s_ref_mask)
{
ref++;
val++;
return true;
}
@ -1360,22 +1372,27 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
if (thread_id)
{
// Only if thread_id is speficied, stop only one and return true.
if (old < UINT16_MAX)
if ((old & s_ref_mask) < s_ref_mask)
{
cond_free(i);
}
return true;
}
}
}
if (old < UINT16_MAX)
if ((old & s_ref_mask) < s_ref_mask)
{
cond_free(i);
}
}
}
return false;
}
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data) & (~s_ref_mask >> 17);
auto* const root = &s_hashtable[iptr % s_hashtable_size];
@ -1383,7 +1400,7 @@ bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
u64 progress = 0;
root->slot_search(data, thread_id, _mm_set1_epi64x(-1), [&](u32 cond_id)
root->slot_search(iptr, thread_id, _mm_set1_epi64x(-1), [&](u32 cond_id)
{
// Forced notification
if (alert_sema(cond_id, data, thread_id, 0, _mm_setzero_si128(), _mm_setzero_si128()))
@ -1413,7 +1430,7 @@ __vectorcall
#endif
atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data) & (~s_ref_mask >> 17);
auto* const root = &s_hashtable[iptr % s_hashtable_size];
@ -1421,7 +1438,7 @@ atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i
u64 progress = 0;
root->slot_search(data, 0, mask, [&](u32 cond_id)
root->slot_search(iptr, 0, mask, [&](u32 cond_id)
{
if (alert_sema(cond_id, data, -1, size, mask, new_value))
{
@ -1441,7 +1458,7 @@ __vectorcall
#endif
atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data) & (~s_ref_mask >> 17);
auto* const root = &s_hashtable[iptr % s_hashtable_size];
@ -1455,7 +1472,7 @@ atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i
// Array itself.
u16 cond_ids[UINT16_MAX + 1];
root->slot_search(data, 0, mask, [&](u32 cond_id)
root->slot_search(iptr, 0, mask, [&](u32 cond_id)
{
u32 res = alert_sema<true>(cond_id, data, -1, size, mask, new_value);