Fix warning in RSXOffload.cpp (rewrite thread)

This commit is contained in:
Nekotekina 2020-02-23 14:19:23 +03:00
parent 7069e7265f
commit 18db020b93
2 changed files with 73 additions and 62 deletions

View File

@ -10,25 +10,23 @@
namespace rsx
{
// initialization
void dma_manager::init()
struct dma_manager::offload_thread
{
m_worker_state = thread_state::created;
m_enqueued_count.store(0);
m_processed_count = 0;
lf_queue<transport_packet> m_work_queue;
atomic_t<u64> m_enqueued_count = 0;
atomic_t<u64> m_processed_count = 0;
transport_packet* m_current_job = nullptr;
// Empty work queue in case of stale contents
m_work_queue.pop_all();
std::thread::id m_thread_id;
thread_ctrl::spawn("RSX offloader", [this]()
void operator ()()
{
if (!g_cfg.video.multithreaded_rsx)
{
// Abort
// Abort if disabled
return;
}
// Register thread id
m_thread_id = std::this_thread::get_id();
if (g_cfg.core.thread_scheduler_enabled)
@ -36,47 +34,62 @@ namespace rsx
thread_ctrl::set_thread_affinity_mask(thread_ctrl::get_affinity_mask(thread_class::rsx));
}
while (m_worker_state != thread_state::finished)
while (thread_ctrl::state() != thread_state::finished)
{
if (m_enqueued_count.load() != m_processed_count)
for (auto&& job : m_work_queue.pop_all())
{
for (m_current_job = m_work_queue.pop_all(); m_current_job; m_current_job.pop_front())
{
switch (m_current_job->type)
{
case raw_copy:
memcpy(m_current_job->dst, m_current_job->src, m_current_job->length);
break;
case vector_copy:
memcpy(m_current_job->dst, m_current_job->opt_storage.data(), m_current_job->length);
break;
case index_emulate:
write_index_array_for_non_indexed_non_native_primitive_to_buffer(
reinterpret_cast<char*>(m_current_job->dst),
static_cast<rsx::primitive_type>(m_current_job->aux_param0),
m_current_job->length);
break;
case callback:
rsx::get_current_renderer()->renderctl(m_current_job->aux_param0, m_current_job->src);
break;
default:
ASSUME(0);
fmt::throw_exception("Unreachable" HERE);
}
m_current_job = &job;
std::atomic_thread_fence(std::memory_order_release);
++m_processed_count;
switch (job.type)
{
case raw_copy:
{
std::memcpy(job.dst, job.src, job.length);
break;
}
case vector_copy:
{
std::memcpy(job.dst, job.opt_storage.data(), job.length);
break;
}
case index_emulate:
{
write_index_array_for_non_indexed_non_native_primitive_to_buffer(static_cast<char*>(job.dst), static_cast<rsx::primitive_type>(job.aux_param0), job.length);
break;
}
case callback:
{
rsx::get_current_renderer()->renderctl(job.aux_param0, job.src);
break;
}
default: ASSUME(0); fmt::throw_exception("Unreachable" HERE);
}
m_processed_count.release(m_processed_count + 1);
}
else
m_current_job = nullptr;
if (m_enqueued_count.load() == m_processed_count.load())
{
// Yield
m_processed_count.notify_all();
std::this_thread::yield();
}
}
m_processed_count = m_enqueued_count.load();
});
m_processed_count = -1;
m_processed_count.notify_all();
}
static constexpr auto name = "RSX Offloader"sv;
};
using dma_thread = named_thread<dma_manager::offload_thread>;
// initialization
void dma_manager::init()
{
}
// General transport
@ -88,8 +101,8 @@ namespace rsx
}
else
{
++m_enqueued_count;
m_work_queue.push(dst, src, length);
g_fxo->get<dma_thread>()->m_enqueued_count++;
g_fxo->get<dma_thread>()->m_work_queue.push(dst, src, length);
}
}
@ -101,8 +114,8 @@ namespace rsx
}
else
{
++m_enqueued_count;
m_work_queue.push(dst, src, length);
g_fxo->get<dma_thread>()->m_enqueued_count++;
g_fxo->get<dma_thread>()->m_work_queue.push(dst, src, length);
}
}
@ -116,8 +129,8 @@ namespace rsx
}
else
{
++m_enqueued_count;
m_work_queue.push(dst, primitive, count);
g_fxo->get<dma_thread>()->m_enqueued_count++;
g_fxo->get<dma_thread>()->m_work_queue.push(dst, primitive, count);
}
}
@ -126,19 +139,21 @@ namespace rsx
{
verify(HERE), g_cfg.video.multithreaded_rsx;
++m_enqueued_count;
m_work_queue.push(request_code, args);
g_fxo->get<dma_thread>()->m_enqueued_count++;
g_fxo->get<dma_thread>()->m_work_queue.push(request_code, args);
}
// Synchronization
bool dma_manager::is_current_thread() const
{
return (std::this_thread::get_id() == m_thread_id);
return std::this_thread::get_id() == g_fxo->get<dma_thread>()->m_thread_id;
}
bool dma_manager::sync()
{
if (m_enqueued_count.load() == m_processed_count) [[likely]]
const auto _thr = g_fxo->get<dma_thread>();
if (_thr->m_enqueued_count.load() < _thr->m_processed_count.load()) [[likely]]
{
// Nothing to do
return true;
@ -152,7 +167,7 @@ namespace rsx
return false;
}
while (m_enqueued_count.load() != m_processed_count)
while (_thr->m_enqueued_count.load() < _thr->m_processed_count.load())
{
rsxthr->on_semaphore_acquire_wait();
_mm_pause();
@ -160,7 +175,7 @@ namespace rsx
}
else
{
while (m_enqueued_count.load() != m_processed_count)
while (_thr->m_enqueued_count.load() < _thr->m_processed_count.load())
_mm_pause();
}
@ -169,7 +184,7 @@ namespace rsx
void dma_manager::join()
{
m_worker_state = thread_state::finished;
*g_fxo->get<dma_thread>() = thread_state::aborting;
sync();
}
@ -188,7 +203,7 @@ namespace rsx
// Fault recovery
utils::address_range dma_manager::get_fault_range(bool writing) const
{
verify(HERE), m_current_job;
const auto m_current_job = verify(HERE, g_fxo->get<dma_thread>()->m_current_job);
void *address = nullptr;
u32 range = m_current_job->length;

View File

@ -48,13 +48,7 @@ namespace rsx
{}
};
lf_queue<transport_packet> m_work_queue;
lf_queue_slice<transport_packet> m_current_job;
atomic_t<u64> m_enqueued_count{ 0 };
volatile u64 m_processed_count = 0;
thread_state m_worker_state = thread_state::detached;
std::thread::id m_thread_id;
atomic_t<bool> m_mem_fault_flag{ false };
atomic_t<bool> m_mem_fault_flag = false;
// TODO: Improved benchmarks here; value determined by profiling on a Ryzen CPU, rounded to the nearest 512 bytes
const u32 max_immediate_transfer_size = 3584;
@ -84,5 +78,7 @@ namespace rsx
// Fault recovery
utils::address_range get_fault_range(bool writing) const;
struct offload_thread;
};
}