Rewrite cond_variable to use waitable atomics

Increase max_timeout and fix max_timeout usage
This commit is contained in:
Nekotekina 2019-09-09 11:09:30 +03:00
parent 67f31c17d1
commit d13ff285d1
5 changed files with 105 additions and 58 deletions

View File

@ -1778,11 +1778,13 @@ void thread_base::finalize() noexcept
--g_thread_count;
}
bool thread_ctrl::_wait_for(u64 usec)
void thread_ctrl::_wait_for(u64 usec)
{
auto _this = g_tls_this_thread;
do
std::unique_lock lock(_this->m_mutex, std::defer_lock);
while (true)
{
// Mutex is unlocked at the start and after the waiting
if (u32 sig = _this->m_signal.load())
@ -1790,17 +1792,20 @@ bool thread_ctrl::_wait_for(u64 usec)
if (sig & 1)
{
_this->m_signal &= ~1;
return true;
return;
}
}
if (usec == 0)
{
// No timeout: return immediately
return false;
return;
}
_this->m_mutex.lock();
if (!lock)
{
lock.lock();
}
// Double-check the value
if (u32 sig = _this->m_signal.load())
@ -1808,15 +1813,17 @@ bool thread_ctrl::_wait_for(u64 usec)
if (sig & 1)
{
_this->m_signal &= ~1;
_this->m_mutex.unlock();
return true;
return;
}
}
}
while (_this->m_cond.wait_unlock(std::exchange(usec, usec > cond_variable::max_timeout ? -1 : 0), _this->m_mutex));
// Timeout
return false;
_this->m_cond.wait_unlock(usec, lock);
if (usec < cond_variable::max_timeout)
{
usec = 0;
}
}
}
thread_base::thread_base(std::string_view name)

View File

@ -185,7 +185,7 @@ class thread_ctrl final
static atomic_t<native_core_arrangement> g_native_core_layout;
// Internal waiting function, may throw. Infinite value is -1.
static bool _wait_for(u64 usec);
static void _wait_for(u64 usec);
friend class thread_base;
@ -235,9 +235,9 @@ public:
}
// Wait once with timeout. May spuriously return false.
static inline bool wait_for(u64 usec)
static inline void wait_for(u64 usec)
{
return _wait_for(usec);
_wait_for(usec);
}
// Wait.

View File

@ -8,50 +8,57 @@
#include <thread>
#endif
bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
// use constants, increase signal space
void cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
{
verify("cond_variable overflow" HERE), (_old & 0xffff) != 0xffff; // Very unlikely: it requires 65535 distinct threads to wait simultaneously
// Not supposed to fail
verify(HERE), _old;
return balanced_wait_until(m_value, _timeout, [&](u32& value, auto... ret) -> int
// Wait with timeout
m_value.wait(_old, atomic_wait_timeout{_timeout > max_timeout ? UINT64_MAX : _timeout * 1000});
// Cleanup
m_value.atomic_op([](u32& value)
{
if (value >> 16)
{
// Success
value -= 0x10001;
return +1;
}
value -= c_waiter_mask & -c_waiter_mask;
if constexpr (sizeof...(ret))
if ((value & c_waiter_mask) == 0)
{
// Retire
value -= 1;
return -1;
// Last waiter removed, clean signals
value = 0;
}
return 0;
});
#ifdef _WIN32
if (_old >= 0x10000 && !OptWaitOnAddress && m_value)
{
// Workaround possibly stolen signal
imp_wake(1);
}
#endif
}
void cond_variable::imp_wake(u32 _count) noexcept
{
// TODO (notify_one)
balanced_awaken<true>(m_value, m_value.atomic_op([&](u32& value) -> u32
const auto [_old, ok] = m_value.fetch_op([](u32& value)
{
// Subtract already signaled number from total amount of waiters
const u32 can_sig = (value & 0xffff) - (value >> 16);
const u32 num_sig = std::min<u32>(can_sig, _count);
if (!value || (value & c_signal_mask) == c_signal_mask)
{
return false;
}
value += num_sig << 16;
return num_sig;
}));
// Add signal
value += c_signal_mask & -c_signal_mask;
return true;
});
if (!ok || !_count)
{
return;
}
if (_count > 1 || ((_old + (c_signal_mask & -c_signal_mask)) & c_signal_mask) == c_signal_mask)
{
// Resort to notify_all if signal count reached max
m_value.notify_all();
}
else
{
m_value.notify_one();
}
}
bool shared_cond::imp_wait(u32 slot, u64 _timeout) noexcept

View File

@ -11,9 +11,31 @@ class cond_variable
// Internal waiter counter
atomic_t<u32> m_value{0};
enum : u32
{
c_waiter_mask = 0x1fff,
c_signal_mask = 0xffffffff & ~c_waiter_mask,
};
protected:
// Increment waiter count
u32 add_waiter() noexcept
{
return m_value.atomic_op([](u32& value) -> u32
{
if ((value & c_signal_mask) == c_signal_mask || (value & c_waiter_mask) == c_waiter_mask)
{
// Signal or waiter overflow, return immediately
return 0;
}
value += c_waiter_mask & -c_waiter_mask;
return value;
});
}
// Internal waiting function
bool imp_wait(u32 _old, u64 _timeout) noexcept;
void imp_wait(u32 _old, u64 _timeout) noexcept;
// Try to notify up to _count threads
void imp_wake(u32 _count) noexcept;
@ -23,22 +45,33 @@ public:
// Intrusive wait algorithm for lockable objects
template <typename T>
bool wait(T& object, u64 usec_timeout = -1)
void wait(T& object, u64 usec_timeout = -1) noexcept
{
const u32 _old = m_value.fetch_add(1); // Increment waiter counter
const u32 _old = add_waiter();
if (!_old)
{
return;
}
object.unlock();
const bool res = imp_wait(_old, usec_timeout);
imp_wait(_old, usec_timeout);
object.lock();
return res;
}
// Unlock all specified objects but don't lock them again
template <typename... Locks>
bool wait_unlock(u64 usec_timeout, Locks&&... locks)
void wait_unlock(u64 usec_timeout, Locks&&... locks)
{
const u32 _old = m_value.fetch_add(1); // Increment waiter counter
const u32 _old = add_waiter();
(..., std::forward<Locks>(locks).unlock());
return imp_wait(_old, usec_timeout);
if (!_old)
{
return;
}
imp_wait(_old, usec_timeout);
}
// Wake one thread
@ -55,11 +88,11 @@ public:
{
if (m_value)
{
imp_wake(65535);
imp_wake(-1);
}
}
static constexpr u64 max_timeout = u64{UINT32_MAX} / 1000 * 1000000;
static constexpr u64 max_timeout = UINT64_MAX / 1000;
};
// Condition variable fused with a pseudo-mutex supporting only reader locks (up to 32 readers).

View File

@ -236,10 +236,10 @@ public:
static_assert(UINT64_MAX / cond_variable::max_timeout >= g_cfg.core.clocks_scale.max, "timeout may overflow during scaling");
// Clamp to max timeout accepted
if (usec > cond_variable::max_timeout) usec = cond_variable::max_timeout;
const u64 max_usec = cond_variable::max_timeout * 100 / g_cfg.core.clocks_scale.max;
// Now scale the result
usec = (usec * g_cfg.core.clocks_scale) / 100;
usec = (std::min<u64>(usec, max_usec) * g_cfg.core.clocks_scale) / 100;
#ifdef __linux__
// TODO: Confirm whether Apple or any BSD can benefit from this as well
@ -271,7 +271,7 @@ public:
// Do not wait for the last quantum to avoid loss of accuracy
thread_ctrl::wait_for(remaining - ((remaining % host_min_quantum) + host_min_quantum));
#else
// Wait on multiple of min quantum for large durations to avoid overloading low thread cpus
// Wait on multiple of min quantum for large durations to avoid overloading low thread cpus
thread_ctrl::wait_for(remaining - (remaining % host_min_quantum));
#endif
}