LV2: Postpone thread notifications to afterward mutex ownership(s)

This commit is contained in:
Eladash 2022-07-21 11:08:57 +03:00 committed by Ivan
parent 6007fd630f
commit dc851a729e
11 changed files with 138 additions and 61 deletions

View File

@ -1192,6 +1192,7 @@ DECLARE(lv2_obj::g_pending);
DECLARE(lv2_obj::g_waiting);
DECLARE(lv2_obj::g_to_sleep);
thread_local DECLARE(lv2_obj::g_to_notify){};
thread_local DECLARE(lv2_obj::g_to_awake);
namespace cpu_counter
@ -1199,22 +1200,22 @@ namespace cpu_counter
void remove(cpu_thread*) noexcept;
}
void lv2_obj::sleep(cpu_thread& cpu, const u64 timeout)
void lv2_obj::sleep(cpu_thread& cpu, const u64 timeout, bool notify_later)
{
vm::temporary_unlock(cpu);
cpu_counter::remove(&cpu);
{
std::lock_guard lock{g_mutex};
sleep_unlocked(cpu, timeout);
sleep_unlocked(cpu, timeout, notify_later);
}
g_to_awake.clear();
}
bool lv2_obj::awake(cpu_thread* const thread, s32 prio)
bool lv2_obj::awake(cpu_thread* const thread, bool notify_later, s32 prio)
{
vm::temporary_unlock();
std::lock_guard lock(g_mutex);
return awake_unlocked(thread, prio);
return awake_unlocked(thread, notify_later, prio);
}
bool lv2_obj::yield(cpu_thread& thread)
@ -1226,10 +1227,10 @@ bool lv2_obj::yield(cpu_thread& thread)
ppu->raddr = 0; // Clear reservation
}
return awake(&thread, yield_cmd);
return awake(&thread, false, yield_cmd);
}
void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout)
void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
{
const u64 start_time = get_guest_system_time();
@ -1325,15 +1326,15 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout)
if (!g_to_awake.empty())
{
// Schedule pending entries
awake_unlocked({});
awake_unlocked({}, notify_later);
}
else
{
schedule_all();
schedule_all(notify_later);
}
}
bool lv2_obj::awake_unlocked(cpu_thread* cpu, s32 prio)
bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
{
// Check thread type
AUDIT(!cpu || cpu->id_type() == 1);
@ -1469,7 +1470,7 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, s32 prio)
}
}
schedule_all();
schedule_all(notify_later);
return changed_queue;
}
@ -1481,10 +1482,12 @@ void lv2_obj::cleanup()
g_to_sleep.clear();
}
void lv2_obj::schedule_all()
void lv2_obj::schedule_all(bool notify_later)
{
if (g_pending.empty() && g_to_sleep.empty())
{
usz notify_later_idx = notify_later ? 0 : std::size(g_to_notify) - 1;
// Wake up threads
for (usz i = 0, x = std::min<usz>(g_cfg.core.ppu_threads, g_ppu.size()); i < x; i++)
{
@ -1495,9 +1498,19 @@ void lv2_obj::schedule_all()
ppu_log.trace("schedule(): %s", target->id);
target->state ^= (cpu_flag::signal + cpu_flag::suspend);
target->start_time = 0;
target->state.notify_one(cpu_flag::signal + cpu_flag::suspend);
if (notify_later_idx >= std::size(g_to_notify) - 1)
{
target->state.notify_one(cpu_flag::signal + cpu_flag::suspend);
}
else
{
g_to_notify[notify_later_idx++] = target;
}
}
}
g_to_notify[notify_later_idx] = nullptr;
}
// Check registered timeouts

View File

@ -143,6 +143,8 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
{
if (cond.waiters)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex->mutex);
if (const auto cpu = cond.schedule<ppu_thread>(cond.sq, cond.mutex->protocol))
@ -158,7 +160,7 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
if (cond.mutex->try_own(*cpu, cpu->id))
{
cond.awake(cpu);
cond.awake(cpu, true);
}
}
}
@ -234,6 +236,8 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
if (cond.waiters)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex->mutex);
for (auto cpu : cond.sq)
@ -252,7 +256,7 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
if (cond.mutex->try_own(*cpu, cpu->id))
{
cond.awake(cpu);
cond.awake(cpu, true);
}
return 1;
@ -294,6 +298,8 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
return -1;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex->mutex);
const u64 syscall_state = sstate.try_read<u64>().second;
@ -313,7 +319,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
if (ppu.loaded_from_savestate)
{
cond.sleep(ppu, timeout);
cond.sleep(ppu, timeout, true);
return static_cast<u32>(syscall_state >> 32);
}
@ -326,7 +332,7 @@ error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)
}
// Sleep current thread and schedule mutex waiter
cond.sleep(ppu, timeout);
cond.sleep(ppu, timeout, true);
// Save the recursive value
return count;

View File

@ -107,8 +107,10 @@ std::shared_ptr<lv2_event_queue> lv2_event_queue::find(u64 ipc_key)
extern void resume_spu_thread_group_from_waiting(spu_thread& spu);
CellError lv2_event_queue::send(lv2_event event)
CellError lv2_event_queue::send(lv2_event event, bool notify_later)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex);
if (!exists)
@ -149,7 +151,7 @@ CellError lv2_event_queue::send(lv2_event event)
std::tie(ppu.gpr[4], ppu.gpr[5], ppu.gpr[6], ppu.gpr[7]) = event;
awake(&ppu);
awake(&ppu, notify_later);
}
else
{
@ -407,6 +409,8 @@ error_code sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sys_e
return CELL_EINVAL;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(queue.mutex);
// "/dev_flash/vsh/module/msmw2.sprx" seems to rely on some cryptic shared memory behaviour that we don't emulate correctly
@ -420,7 +424,7 @@ error_code sys_event_queue_receive(ppu_thread& ppu, u32 equeue_id, vm::ptr<sys_e
if (queue.events.empty())
{
queue.sq.emplace_back(&ppu);
queue.sleep(ppu, timeout);
queue.sleep(ppu, timeout, true);
return CELL_EBUSY;
}
@ -671,13 +675,13 @@ error_code sys_event_port_send(u32 eport_id, u64 data1, u64 data2, u64 data3)
sys_event.trace("sys_event_port_send(eport_id=0x%x, data1=0x%llx, data2=0x%llx, data3=0x%llx)", eport_id, data1, data2, data3);
const auto port = idm::get<lv2_obj, lv2_event_port>(eport_id, [&](lv2_event_port& port) -> CellError
const auto port = idm::check<lv2_obj, lv2_event_port>(eport_id, [&](lv2_event_port& port) -> CellError
{
if (lv2_obj::check(port.queue))
{
const u64 source = port.name ? port.name : (s64{process_getpid()} << 32) | u64{eport_id};
return port.queue->send(source, data1, data2, data3);
return port.queue->send(source, data1, data2, data3, true);
}
return CELL_ENOTCONN;

View File

@ -99,11 +99,11 @@ struct lv2_event_queue final : public lv2_obj
static void save_ptr(utils::serial&, lv2_event_queue*);
static std::shared_ptr<lv2_event_queue> load_ptr(utils::serial& ar, std::shared_ptr<lv2_event_queue>& queue);
CellError send(lv2_event);
CellError send(lv2_event event, bool notify_later = false);
CellError send(u64 source, u64 d1, u64 d2, u64 d3)
CellError send(u64 source, u64 d1, u64 d2, u64 d3, bool notify_later = false)
{
return send(std::make_tuple(source, d1, d2, d3));
return send(std::make_tuple(source, d1, d2, d3), notify_later);
}
// Get event queue by its global key

View File

@ -150,6 +150,8 @@ error_code sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm
return {};
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(flag.mutex);
if (flag.pattern.fetch_op([&](u64& pat)
@ -167,7 +169,7 @@ error_code sys_event_flag_wait(ppu_thread& ppu, u32 id, u64 bitptn, u32 mode, vm
flag.waiters++;
flag.sq.emplace_back(&ppu);
flag.sleep(ppu, timeout);
flag.sleep(ppu, timeout, true);
return CELL_EBUSY;
});

View File

@ -127,6 +127,8 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
if (cond.waiters)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex);
if (cpu)
@ -188,7 +190,7 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
if (result)
{
cond.awake(result);
cond.awake(result, true);
}
return 1;
@ -255,6 +257,8 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
if (cond.waiters)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex);
u32 result = 0;
@ -294,7 +298,7 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
if (need_awake)
{
lv2_obj::awake_all();
lv2_obj::awake_all(true);
}
return result;
@ -341,6 +345,8 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
// Increment lwmutex's lwcond's waiters count
mutex->lwcond_waiters++;
lv2_obj::notify_all_t notify;
std::lock_guard lock(cond.mutex);
const bool mutex_sleep = sstate.try_read<bool>().second;
@ -381,7 +387,7 @@ error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
}
// Sleep current thread and schedule lwmutex waiter
cond.sleep(ppu, timeout);
cond.sleep(ppu, timeout, true);
});
if (!cond || !mutex)

View File

@ -144,6 +144,8 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
return true;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
auto [old, _] = mutex.signaled.fetch_op([](s32& value)
@ -168,7 +170,7 @@ error_code _sys_lwmutex_lock(ppu_thread& ppu, u32 lwmutex_id, u64 timeout)
}
mutex.add_waiter(&ppu);
mutex.sleep(ppu, timeout);
mutex.sleep(ppu, timeout, true);
return false;
});
@ -275,6 +277,8 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id)
const auto mutex = idm::check<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (const auto cpu = mutex.schedule<ppu_thread>(mutex.sq, mutex.protocol))
@ -285,7 +289,7 @@ error_code _sys_lwmutex_unlock(ppu_thread& ppu, u32 lwmutex_id)
return;
}
mutex.awake(cpu);
mutex.awake(cpu, true);
return;
}
@ -308,6 +312,8 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id)
const auto mutex = idm::check<lv2_obj, lv2_lwmutex>(lwmutex_id, [&](lv2_lwmutex& mutex)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (const auto cpu = mutex.schedule<ppu_thread>(mutex.sq, mutex.protocol))
@ -319,7 +325,7 @@ error_code _sys_lwmutex_unlock2(ppu_thread& ppu, u32 lwmutex_id)
}
static_cast<ppu_thread*>(cpu)->gpr[3] = CELL_EBUSY;
mutex.awake(cpu);
mutex.awake(cpu, true);
return;
}

View File

@ -139,6 +139,8 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
if (result == CELL_EBUSY)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (mutex.try_own(ppu, ppu.id))
@ -147,7 +149,7 @@ error_code sys_mutex_lock(ppu_thread& ppu, u32 mutex_id, u64 timeout)
}
else
{
mutex.sleep(ppu, timeout);
mutex.sleep(ppu, timeout, true);
}
}
@ -258,9 +260,31 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id)
sys_mutex.trace("sys_mutex_unlock(mutex_id=0x%x)", mutex_id);
const auto mutex = idm::check<lv2_obj, lv2_mutex>(mutex_id, [&](lv2_mutex& mutex)
const auto mutex = idm::check<lv2_obj, lv2_mutex>(mutex_id, [&](lv2_mutex& mutex) -> CellError
{
return mutex.try_unlock(ppu.id);
CellError result = mutex.try_unlock(ppu.id);
if (result == CELL_EBUSY)
{
lv2_obj::notify_all_t notify;
std::lock_guard lock(mutex.mutex);
if (auto cpu = mutex.reown<ppu_thread>())
{
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return {};
}
mutex.awake(cpu, true);
}
result = {};
}
return result;
});
if (!mutex)
@ -268,22 +292,7 @@ error_code sys_mutex_unlock(ppu_thread& ppu, u32 mutex_id)
return CELL_ESRCH;
}
if (mutex.ret == CELL_EBUSY)
{
std::lock_guard lock(mutex->mutex);
if (auto cpu = mutex->reown<ppu_thread>())
{
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return {};
}
mutex->awake(cpu);
}
}
else if (mutex.ret)
if (mutex.ret)
{
return mutex.ret;
}

View File

@ -112,6 +112,8 @@ error_code sys_rwlock_rlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
}
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(rwlock.mutex);
const s64 _old = rwlock.owner.fetch_op([&](s64& val)
@ -129,7 +131,7 @@ error_code sys_rwlock_rlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
if (_old > 0 || _old & 1)
{
rwlock.rq.emplace_back(&ppu);
rwlock.sleep(ppu, timeout);
rwlock.sleep(ppu, timeout, true);
return false;
}
@ -334,6 +336,8 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
return val;
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(rwlock.mutex);
const s64 _old = rwlock.owner.fetch_op([&](s64& val)
@ -351,7 +355,7 @@ error_code sys_rwlock_wlock(ppu_thread& ppu, u32 rw_lock_id, u64 timeout)
if (_old != 0)
{
rwlock.wq.emplace_back(&ppu);
rwlock.sleep(ppu, timeout);
rwlock.sleep(ppu, timeout, true);
}
return _old;

View File

@ -121,12 +121,14 @@ error_code sys_semaphore_wait(ppu_thread& ppu, u32 sem_id, u64 timeout)
}
}
lv2_obj::notify_all_t notify;
std::lock_guard lock(sema.mutex);
if (sema.val-- <= 0)
{
sema.sq.emplace_back(&ppu);
sema.sleep(ppu, timeout);
sema.sleep(ppu, timeout, true);
return false;
}

View File

@ -165,17 +165,17 @@ public:
private:
// Remove the current thread from the scheduling queue, register timeout
static void sleep_unlocked(cpu_thread&, u64 timeout);
static void sleep_unlocked(cpu_thread&, u64 timeout, bool notify_later);
// Schedule the thread
static bool awake_unlocked(cpu_thread*, s32 prio = enqueue_cmd);
static bool awake_unlocked(cpu_thread*, bool notify_later = false, s32 prio = enqueue_cmd);
public:
static constexpr u64 max_timeout = u64{umax} / 1000;
static void sleep(cpu_thread& cpu, const u64 timeout = 0);
static void sleep(cpu_thread& cpu, const u64 timeout = 0, bool notify_later = false);
static bool awake(cpu_thread* const thread, s32 prio = enqueue_cmd);
static bool awake(cpu_thread* const thread, bool notify_later = false, s32 prio = enqueue_cmd);
// Returns true on successful context switch, false otherwise
static bool yield(cpu_thread& thread);
@ -183,12 +183,12 @@ public:
static void set_priority(cpu_thread& thread, s32 prio)
{
ensure(prio + 512u < 3712);
awake(&thread, prio);
awake(&thread, false, prio);
}
static inline void awake_all()
static inline void awake_all(bool notify_later = false)
{
awake({});
awake({}, notify_later);
g_to_awake.clear();
}
@ -433,6 +433,28 @@ public:
return true;
}
static inline void notify_all()
{
for (auto cpu : g_to_notify)
{
if (!cpu)
{
g_to_notify[0] = nullptr;
return;
}
cpu->state.notify_one(cpu_flag::suspend + cpu_flag::signal);
}
}
struct notify_all_t
{
~notify_all_t() noexcept
{
lv2_obj::notify_all();
}
};
// Scheduler mutex
static shared_mutex g_mutex;
@ -452,5 +474,8 @@ private:
// Threads which must call lv2_obj::sleep before the scheduler starts
static std::deque<class cpu_thread*> g_to_sleep;
static void schedule_all();
// Pending list of threads to notify
static thread_local std::add_pointer_t<class cpu_thread> g_to_notify[4];
static void schedule_all(bool notify_later);
};