diff --git a/rpcs3/Emu/RSX/RSXOffload.cpp b/rpcs3/Emu/RSX/RSXOffload.cpp index e9f8465060..84bed103bd 100644 --- a/rpcs3/Emu/RSX/RSXOffload.cpp +++ b/rpcs3/Emu/RSX/RSXOffload.cpp @@ -10,25 +10,23 @@ namespace rsx { - // initialization - void dma_manager::init() + struct dma_manager::offload_thread { - m_worker_state = thread_state::created; - m_enqueued_count.store(0); - m_processed_count = 0; + lf_queue m_work_queue; + atomic_t m_enqueued_count = 0; + atomic_t m_processed_count = 0; + transport_packet* m_current_job = nullptr; - // Empty work queue in case of stale contents - m_work_queue.pop_all(); + std::thread::id m_thread_id; - thread_ctrl::spawn("RSX offloader", [this]() + void operator ()() { if (!g_cfg.video.multithreaded_rsx) { - // Abort + // Abort if disabled return; } - // Register thread id m_thread_id = std::this_thread::get_id(); if (g_cfg.core.thread_scheduler_enabled) @@ -36,47 +34,62 @@ namespace rsx thread_ctrl::set_thread_affinity_mask(thread_ctrl::get_affinity_mask(thread_class::rsx)); } - while (m_worker_state != thread_state::finished) + while (thread_ctrl::state() != thread_state::finished) { - if (m_enqueued_count.load() != m_processed_count) + for (auto&& job : m_work_queue.pop_all()) { - for (m_current_job = m_work_queue.pop_all(); m_current_job; m_current_job.pop_front()) - { - switch (m_current_job->type) - { - case raw_copy: - memcpy(m_current_job->dst, m_current_job->src, m_current_job->length); - break; - case vector_copy: - memcpy(m_current_job->dst, m_current_job->opt_storage.data(), m_current_job->length); - break; - case index_emulate: - write_index_array_for_non_indexed_non_native_primitive_to_buffer( - reinterpret_cast(m_current_job->dst), - static_cast(m_current_job->aux_param0), - m_current_job->length); - break; - case callback: - rsx::get_current_renderer()->renderctl(m_current_job->aux_param0, m_current_job->src); - break; - default: - ASSUME(0); - fmt::throw_exception("Unreachable" HERE); - } + m_current_job = &job; - std::atomic_thread_fence(std::memory_order_release); - ++m_processed_count; + switch (job.type) + { + case raw_copy: + { + std::memcpy(job.dst, job.src, job.length); + break; } + case vector_copy: + { + std::memcpy(job.dst, job.opt_storage.data(), job.length); + break; + } + case index_emulate: + { + write_index_array_for_non_indexed_non_native_primitive_to_buffer(static_cast(job.dst), static_cast(job.aux_param0), job.length); + break; + } + case callback: + { + rsx::get_current_renderer()->renderctl(job.aux_param0, job.src); + break; + } + default: ASSUME(0); fmt::throw_exception("Unreachable" HERE); + } + + m_processed_count.release(m_processed_count + 1); } - else + + m_current_job = nullptr; + + if (m_enqueued_count.load() == m_processed_count.load()) { - // Yield + m_processed_count.notify_all(); std::this_thread::yield(); } } - m_processed_count = m_enqueued_count.load(); - }); + m_processed_count = -1; + m_processed_count.notify_all(); + } + + static constexpr auto name = "RSX Offloader"sv; + }; + + + using dma_thread = named_thread; + + // initialization + void dma_manager::init() + { } // General transport @@ -88,8 +101,8 @@ namespace rsx } else { - ++m_enqueued_count; - m_work_queue.push(dst, src, length); + g_fxo->get()->m_enqueued_count++; + g_fxo->get()->m_work_queue.push(dst, src, length); } } @@ -101,8 +114,8 @@ namespace rsx } else { - ++m_enqueued_count; - m_work_queue.push(dst, src, length); + g_fxo->get()->m_enqueued_count++; + g_fxo->get()->m_work_queue.push(dst, src, length); } } @@ -116,8 +129,8 @@ namespace rsx } else { - ++m_enqueued_count; - m_work_queue.push(dst, primitive, count); + g_fxo->get()->m_enqueued_count++; + g_fxo->get()->m_work_queue.push(dst, primitive, count); } } @@ -126,19 +139,21 @@ namespace rsx { verify(HERE), g_cfg.video.multithreaded_rsx; - ++m_enqueued_count; - m_work_queue.push(request_code, args); + g_fxo->get()->m_enqueued_count++; + g_fxo->get()->m_work_queue.push(request_code, args); } // Synchronization bool dma_manager::is_current_thread() const { - return (std::this_thread::get_id() == m_thread_id); + return std::this_thread::get_id() == g_fxo->get()->m_thread_id; } bool dma_manager::sync() { - if (m_enqueued_count.load() == m_processed_count) [[likely]] + const auto _thr = g_fxo->get(); + + if (_thr->m_enqueued_count.load() < _thr->m_processed_count.load()) [[likely]] { // Nothing to do return true; @@ -152,7 +167,7 @@ namespace rsx return false; } - while (m_enqueued_count.load() != m_processed_count) + while (_thr->m_enqueued_count.load() < _thr->m_processed_count.load()) { rsxthr->on_semaphore_acquire_wait(); _mm_pause(); @@ -160,7 +175,7 @@ namespace rsx } else { - while (m_enqueued_count.load() != m_processed_count) + while (_thr->m_enqueued_count.load() < _thr->m_processed_count.load()) _mm_pause(); } @@ -169,7 +184,7 @@ namespace rsx void dma_manager::join() { - m_worker_state = thread_state::finished; + *g_fxo->get() = thread_state::aborting; sync(); } @@ -188,7 +203,7 @@ namespace rsx // Fault recovery utils::address_range dma_manager::get_fault_range(bool writing) const { - verify(HERE), m_current_job; + const auto m_current_job = verify(HERE, g_fxo->get()->m_current_job); void *address = nullptr; u32 range = m_current_job->length; diff --git a/rpcs3/Emu/RSX/RSXOffload.h b/rpcs3/Emu/RSX/RSXOffload.h index 5bacb9b4ff..b1e52275d7 100644 --- a/rpcs3/Emu/RSX/RSXOffload.h +++ b/rpcs3/Emu/RSX/RSXOffload.h @@ -48,13 +48,7 @@ namespace rsx {} }; - lf_queue m_work_queue; - lf_queue_slice m_current_job; - atomic_t m_enqueued_count{ 0 }; - volatile u64 m_processed_count = 0; - thread_state m_worker_state = thread_state::detached; - std::thread::id m_thread_id; - atomic_t m_mem_fault_flag{ false }; + atomic_t m_mem_fault_flag = false; // TODO: Improved benchmarks here; value determined by profiling on a Ryzen CPU, rounded to the nearest 512 bytes const u32 max_immediate_transfer_size = 3584; @@ -84,5 +78,7 @@ namespace rsx // Fault recovery utils::address_range get_fault_range(bool writing) const; + + struct offload_thread; }; }