diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index 109d2ccc97..1c181c5877 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -4265,6 +4265,40 @@ bool spu_thread::set_ch_value(u32 ch, u32 value) fmt::throw_exception("Unknown/illegal channel in WRCH (ch=%d [%s], value=0x%x)", ch, ch < 128 ? spu_ch_name[ch] : "???", value); } +extern void resume_spu_thread_group_from_waiting(spu_thread& spu) +{ + const auto group = spu.group; + + std::lock_guard lock(group->mutex); + + if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING) + { + group->run_state = SPU_THREAD_GROUP_STATUS_RUNNING; + } + else if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING_AND_SUSPENDED) + { + group->run_state = SPU_THREAD_GROUP_STATUS_SUSPENDED; + } + + for (auto& thread : group->threads) + { + if (thread) + { + if (thread.get() == &spu) + { + constexpr auto flags = cpu_flag::suspend + cpu_flag::signal; + ensure(((thread->state ^= flags) & flags) == cpu_flag::signal); + } + else + { + thread->state -= cpu_flag::suspend; + } + + thread->state.notify_one(cpu_flag::suspend + cpu_flag::signal); + } + } +} + bool spu_thread::stop_and_signal(u32 code) { spu_log.trace("stop_and_signal(code=0x%x)", code); @@ -4293,6 +4327,23 @@ bool spu_thread::stop_and_signal(u32 code) return true; } + auto get_queue = [this](u32 spuq) -> const std::shared_ptr& + { + for (auto& v : this->spuq) + { + if (spuq == v.first) + { + if (lv2_obj::check(v.second)) + { + return v.second; + } + } + } + + static const std::shared_ptr empty; + return empty; + }; + switch (code) { case 0x001: @@ -4338,6 +4389,8 @@ bool spu_thread::stop_and_signal(u32 code) spu_function_logger logger(*this, "sys_spu_thread_receive_event"); + std::shared_ptr queue; + while (true) { // Check group status, wait if necessary @@ -4355,7 +4408,15 @@ bool spu_thread::stop_and_signal(u32 code) thread_ctrl::wait_on(state, old);; } - std::lock_guard lock(group->mutex); + reader_lock{group->mutex}, queue = get_queue(spuq); + + if (!queue) + { + return ch_in_mbox.set_values(1, CELL_EINVAL), true; + } + + // Lock queue's mutex first, then group's mutex + std::scoped_lock lock(queue->mutex, group->mutex); if (is_stopped()) { @@ -4368,27 +4429,12 @@ bool spu_thread::stop_and_signal(u32 code) continue; } - lv2_event_queue* queue = nullptr; - - for (auto& v : this->spuq) + if (queue != get_queue(spuq)) { - if (spuq == v.first) - { - if (lv2_obj::check(v.second)) - { - queue = v.second.get(); - break; - } - } + // Try again + continue; } - if (!queue) - { - return ch_in_mbox.set_values(1, CELL_EINVAL), true; - } - - std::lock_guard qlock(queue->mutex); - if (!queue->exists) { return ch_in_mbox.set_values(1, CELL_EINVAL), true; @@ -4440,30 +4486,6 @@ bool spu_thread::stop_and_signal(u32 code) thread_ctrl::wait_on(state, old); } - std::lock_guard lock(group->mutex); - - if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING) - { - group->run_state = SPU_THREAD_GROUP_STATUS_RUNNING; - } - else if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING_AND_SUSPENDED) - { - group->run_state = SPU_THREAD_GROUP_STATUS_SUSPENDED; - } - - for (auto& thread : group->threads) - { - if (thread) - { - thread->state -= cpu_flag::suspend; - - if (thread.get() != this) - { - thread->state.notify_one(cpu_flag::suspend); - } - } - } - return true; } @@ -4486,29 +4508,36 @@ bool spu_thread::stop_and_signal(u32 code) spu_log.trace("sys_spu_thread_tryreceive_event(spuq=0x%x)", spuq); - std::lock_guard lock(group->mutex); + std::shared_ptr queue; - lv2_event_queue* queue = nullptr; + reader_lock{group->mutex}, queue = get_queue(spuq); - for (auto& v : this->spuq) + std::unique_lock qlock, group_lock; + + while (true) { - if (spuq == v.first) + if (!queue) { - if (lv2_obj::check(v.second)) - { - queue = v.second.get(); - break; - } + return ch_in_mbox.set_values(1, CELL_EINVAL), true; + } + + // Lock queue's mutex first, then group's mutex + qlock = std::unique_lock{queue->mutex}; + group_lock = std::unique_lock{group->mutex}; + + if (const auto& queue0 = get_queue(spuq); queue != queue0) + { + // Keep atleast one reference of the pointer so mutex unlock can work + const auto old_ref = std::exchange(queue, queue0); + group_lock.unlock(); + qlock.unlock(); + } + else + { + break; } } - if (!queue) - { - return ch_in_mbox.set_values(1, CELL_EINVAL), true; - } - - std::lock_guard qlock(queue->mutex); - if (!queue->exists) { return ch_in_mbox.set_values(1, CELL_EINVAL), true; diff --git a/rpcs3/Emu/Cell/lv2/sys_event.cpp b/rpcs3/Emu/Cell/lv2/sys_event.cpp index 5f7ea41942..683804f576 100644 --- a/rpcs3/Emu/Cell/lv2/sys_event.cpp +++ b/rpcs3/Emu/Cell/lv2/sys_event.cpp @@ -32,6 +32,8 @@ std::shared_ptr lv2_event_queue::find(u64 ipc_key) return g_fxo->get>().get(ipc_key); } +extern void resume_spu_thread_group_from_waiting(spu_thread& spu); + CellError lv2_event_queue::send(lv2_event event) { std::lock_guard lock(mutex); @@ -74,9 +76,7 @@ CellError lv2_event_queue::send(lv2_event event) const u32 data2 = static_cast(std::get<2>(event)); const u32 data3 = static_cast(std::get<3>(event)); spu.ch_in_mbox.set_values(4, CELL_OK, data1, data2, data3); - - spu.state += cpu_flag::signal; - spu.state.notify_one(cpu_flag::signal); + resume_spu_thread_group_from_waiting(spu); } return {}; @@ -161,11 +161,15 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode) if (mode == SYS_EVENT_QUEUE_DESTROY_FORCE) { + std::deque sq; + std::lock_guard lock(queue->mutex); + sq = std::move(queue->sq); + if (queue->type == SYS_PPU_QUEUE) { - for (auto cpu : queue->sq) + for (auto cpu : sq) { static_cast(*cpu).gpr[3] = CELL_ECANCELED; queue->append(cpu); @@ -178,11 +182,10 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode) } else { - for (auto cpu : queue->sq) + for (auto cpu : sq) { static_cast(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED); - cpu->state += cpu_flag::signal; - cpu->state.notify_one(cpu_flag::signal); + resume_spu_thread_group_from_waiting(static_cast(*cpu)); } } }