atomic.cpp: remove load() from notify functions

Only compare masks for overlap for second overload (with mask provided).
Explicit "new value" can be provided in new 3-arg overloads.
Also rename atomic_storage_futex -> atomic_wait_engine.
This commit is contained in:
Nekotekina 2020-11-06 02:43:14 +03:00
parent 9fb8d449fe
commit 557f0c5a8a
4 changed files with 89 additions and 58 deletions

View File

@ -1848,7 +1848,7 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*))
thread_ctrl::g_tls_error_callback = error_cb;
// Initialize atomic wait callback
atomic_storage_futex::set_wait_callback(wait_cb);
atomic_wait_engine::set_wait_callback(wait_cb);
g_tls_log_prefix = []
{
@ -1908,7 +1908,7 @@ void thread_base::notify_abort() noexcept
while (auto ptr = m_state_notifier.load())
{
// Since this function is not perfectly implemented, run it in a loop
if (atomic_storage_futex::raw_notify(ptr, tid))
if (atomic_wait_engine::raw_notify(ptr, tid))
{
break;
}
@ -1973,7 +1973,7 @@ bool thread_base::finalize(thread_state result_state) noexcept
void thread_base::finalize() noexcept
{
atomic_storage_futex::set_wait_callback(nullptr);
atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
thread_ctrl::g_tls_this_thread = nullptr;
}

View File

@ -448,7 +448,7 @@ void cpu_thread::operator()()
return;
}
atomic_storage_futex::set_notify_callback([](const void*, u64 progress)
atomic_wait_engine::set_notify_callback([](const void*, u64 progress)
{
static thread_local bool wait_set = false;
@ -514,7 +514,7 @@ void cpu_thread::operator()()
ptr->compare_and_swap(_this, nullptr);
}
atomic_storage_futex::set_notify_callback(nullptr);
atomic_wait_engine::set_notify_callback(nullptr);
g_tls_log_control = [](const char*, u64){};

View File

@ -108,6 +108,15 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
return true;
}
// Compare only masks, new value is not available in this mode
if ((size1 | size2) == umax)
{
// Simple mask overlap
const auto v0 = _mm_and_si128(mask1, mask2);
const auto v1 = _mm_packs_epi16(v0, v0);
return _mm_cvtsi128_si64(v1) != 0;
}
// Generate masked value inequality bits
const auto v0 = _mm_and_si128(_mm_and_si128(mask1, mask2), _mm_xor_si128(val1, val2));
@ -137,7 +146,7 @@ cmp_mask(u32 size1, __m128i mask1, __m128i val1, u32 size2, __m128i mask2, __m12
return true;
}
namespace
namespace atomic_wait
{
// Essentially a fat semaphore
struct alignas(64) cond_handle
@ -184,7 +193,7 @@ namespace
}
// Max allowed thread number is chosen to fit in 16 bits
static std::aligned_storage_t<sizeof(cond_handle), alignof(cond_handle)> s_cond_list[UINT16_MAX]{};
static std::aligned_storage_t<sizeof(atomic_wait::cond_handle), alignof(atomic_wait::cond_handle)> s_cond_list[UINT16_MAX]{};
// Used to allow concurrent notifying
static atomic_t<u16> s_cond_refs[UINT16_MAX + 1]{};
@ -234,7 +243,7 @@ static u32 cond_alloc()
const u32 id = group * 64 + std::countr_one(bits);
// Construct inplace before it can be used
new (s_cond_list + id) cond_handle();
new (s_cond_list + id) atomic_wait::cond_handle();
// Add first reference
verify(HERE), !s_cond_refs[id]++;
@ -248,11 +257,11 @@ static u32 cond_alloc()
return 0;
}
static cond_handle* cond_get(u32 cond_id)
static atomic_wait::cond_handle* cond_get(u32 cond_id)
{
if (cond_id - 1 < u32{UINT16_MAX}) [[likely]]
{
return std::launder(reinterpret_cast<cond_handle*>(s_cond_list + (cond_id - 1)));
return std::launder(reinterpret_cast<atomic_wait::cond_handle*>(s_cond_list + (cond_id - 1)));
}
return nullptr;
@ -322,7 +331,7 @@ static u32 cond_lock(atomic_t<u16>* sema)
return 0;
}
namespace
namespace atomic_wait
{
#define MAX_THREADS (56)
@ -407,16 +416,16 @@ namespace
}
// Main hashtable for atomic wait.
alignas(128) static sync_var s_hashtable[s_hashtable_size]{};
alignas(128) static atomic_wait::sync_var s_hashtable[s_hashtable_size]{};
namespace
namespace atomic_wait
{
struct slot_info
{
constexpr slot_info() noexcept = default;
// Branch extension
sync_var branch[48 - s_hashtable_power]{};
atomic_wait::sync_var branch[48 - s_hashtable_power]{};
};
}
@ -424,7 +433,7 @@ namespace
#define MAX_SLOTS (4096)
// Array of slot branch objects
alignas(128) static slot_info s_slot_list[MAX_SLOTS]{};
alignas(128) static atomic_wait::slot_info s_slot_list[MAX_SLOTS]{};
// Allocation bits
static atomic_t<u64, 64> s_slot_bits[MAX_SLOTS / 64]{};
@ -482,7 +491,7 @@ static u64 slot_alloc()
#undef MAX_SLOTS
static sync_var* slot_get(std::uintptr_t iptr, sync_var* loc, u64 lv = 0)
static atomic_wait::sync_var* slot_get(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0)
{
if (!loc)
{
@ -523,7 +532,7 @@ static void slot_free(u64 id)
s_slot_sema--;
}
static void slot_free(std::uintptr_t iptr, sync_var* loc, u64 lv = 0)
static void slot_free(std::uintptr_t iptr, atomic_wait::sync_var* loc, u64 lv = 0)
{
const u64 value = loc->addr_ref.load();
@ -568,7 +577,7 @@ SAFE_BUFFERS void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask)
atomic_wait_engine::wait(const void* data, u32 size, __m128i old_value, u64 timeout, __m128i mask)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
@ -576,7 +585,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti
u64 slot_a = -1;
// Found slot object
sync_var* slot = nullptr;
atomic_wait::sync_var* slot = nullptr;
auto install_op = [&](u64& value) -> u64
{
@ -616,7 +625,7 @@ atomic_storage_futex::wait(const void* data, u32 size, __m128i old_value, u64 ti
// Search detail
u64 lv = 0;
for (sync_var* ptr = &s_hashtable[iptr % s_hashtable_size];;)
for (atomic_wait::sync_var* ptr = &s_hashtable[iptr % s_hashtable_size];;)
{
auto [_old, ok] = ptr->addr_ref.fetch_op(install_op);
@ -899,7 +908,7 @@ alert_sema(atomic_t<u16>* sema, const void* data, u64 info, u32 size, __m128i ma
return ok;
}
void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
void atomic_wait_engine::set_wait_callback(bool(*cb)(const void* data))
{
if (cb)
{
@ -911,7 +920,7 @@ void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data))
}
}
void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64))
void atomic_wait_engine::set_notify_callback(void(*cb)(const void*, u64))
{
if (cb)
{
@ -923,7 +932,7 @@ void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64))
}
}
bool atomic_storage_futex::raw_notify(const void* data, u64 thread_id)
bool atomic_wait_engine::raw_notify(const void* data, u64 thread_id)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
@ -965,7 +974,7 @@ void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value)
atomic_wait_engine::notify_one(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);
@ -998,7 +1007,7 @@ SAFE_BUFFERS void
#ifdef _WIN32
__vectorcall
#endif
atomic_storage_futex::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value)
atomic_wait_engine::notify_all(const void* data, u32 size, __m128i mask, __m128i new_value)
{
const std::uintptr_t iptr = reinterpret_cast<std::uintptr_t>(data);

View File

@ -14,8 +14,16 @@ enum class atomic_wait_timeout : u64
inf = 0xffffffffffffffff,
};
// Unused externally
namespace atomic_wait
{
struct sync_var;
struct slot_info;
struct sema_handle;
}
// Helper for waitable atomics (as in C++20 std::atomic)
struct atomic_storage_futex
struct atomic_wait_engine
{
private:
template <typename T, std::size_t Align>
@ -1242,12 +1250,12 @@ public:
if constexpr (sizeof(T) <= 8)
{
const __m128i old = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(old_value));
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
}
else if constexpr (sizeof(T) == 16)
{
const __m128i old = std::bit_cast<__m128i>(old_value);
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), _mm_set1_epi64x(-1));
}
}
@ -1258,73 +1266,87 @@ public:
{
const __m128i old = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(old_value));
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i old = std::bit_cast<__m128i>(old_value);
const __m128i mask = std::bit_cast<__m128i>(mask_value);
atomic_storage_futex::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
atomic_wait_engine::wait(&m_data, sizeof(T), old, static_cast<u64>(timeout), mask);
}
}
void notify_one() noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_one(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
}
atomic_wait_engine::notify_one(&m_data, -1, _mm_set1_epi64x(-1), _mm_setzero_si128());
}
// Notify with mask, allowing to not wake up thread which doesn't wait on this mask
void notify_one(type mask_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new);
atomic_wait_engine::notify_one(&m_data, -1, mask, _mm_setzero_si128());
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_one(&m_data, sizeof(T), mask, _new);
atomic_wait_engine::notify_one(&m_data, -1, mask, _mm_setzero_si128());
}
}
// Notify with mask and value, allowing to not wake up thread which doesn't wait on them
void notify_one(type mask_value, type new_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(new_value));
atomic_wait_engine::notify_one(&m_data, sizeof(T), mask, _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(new_value);
atomic_wait_engine::notify_one(&m_data, sizeof(T), mask, _new);
}
}
void notify_all() noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_all(&m_data, sizeof(T), _mm_set1_epi64x(-1), _new);
}
atomic_wait_engine::notify_all(&m_data, -1, _mm_set1_epi64x(-1), _mm_setzero_si128());
}
// Notify all threads with mask, allowing to not wake up threads which don't wait on them
void notify_all(type mask_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(load()));
atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new);
atomic_wait_engine::notify_all(&m_data, -1, mask, _mm_setzero_si128());
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(load());
atomic_storage_futex::notify_all(&m_data, sizeof(T), mask, _new);
atomic_wait_engine::notify_all(&m_data, -1, mask, _mm_setzero_si128());
}
}
// Notify all threads with mask and value, allowing to not wake up threads which don't wait on them
void notify_all(type mask_value, type new_value) noexcept
{
if constexpr (sizeof(T) <= 8)
{
const __m128i mask = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(mask_value));
const __m128i _new = _mm_cvtsi64_si128(std::bit_cast<get_uint_t<sizeof(T)>>(new_value));
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _new);
}
else if constexpr (sizeof(T) == 16)
{
const __m128i mask = std::bit_cast<__m128i>(mask_value);
const __m128i _new = std::bit_cast<__m128i>(new_value);
atomic_wait_engine::notify_all(&m_data, sizeof(T), mask, _new);
}
}
};