sys_lwcond/cond/ppu_thread: Respect scheduler in various syscalls

This commit is contained in:
Eladash 2023-05-24 21:19:49 +03:00 committed by Ivan
parent 5d4e87373f
commit d152537e50
3 changed files with 402 additions and 202 deletions

View File

@ -143,36 +143,70 @@ error_code sys_cond_signal(ppu_thread& ppu, u32 cond_id)
sys_cond.trace("sys_cond_signal(cond_id=0x%x)", cond_id);
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond)
while (true)
{
if (atomic_storage<ppu_thread*>::load(cond.sq))
if (ppu.test_stopped())
{
std::lock_guard lock(cond.mutex->mutex);
ppu.state += cpu_flag::again;
return {};
}
if (const auto cpu = cond.schedule<ppu_thread>(cond.sq, cond.mutex->protocol))
bool finished = true;
ppu.state += cpu_flag::wait;
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond)
{
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
std::lock_guard lock(cond.mutex->mutex);
if (ppu.state & cpu_flag::suspend)
{
ppu.state += cpu_flag::again;
// Test if another signal caused the current thread to be suspended, in which case it needs to wait until the thread wakes up (otherwise the signal may cause unexpected results)
finished = false;
return;
}
// TODO: Is EBUSY returned after reqeueing, on sys_cond_destroy?
if (cond.mutex->try_own(*cpu))
if (const auto cpu = cond.schedule<ppu_thread>(cond.sq, cond.mutex->protocol))
{
cond.awake(cpu);
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return;
}
// TODO: Is EBUSY returned after reqeueing, on sys_cond_destroy?
if (cond.mutex->try_own(*cpu))
{
cond.awake(cpu);
}
}
}
else
{
cond.mutex->mutex.lock_unlock();
if (ppu.state & cpu_flag::suspend)
{
finished = false;
}
}
});
if (!finished)
{
continue;
}
});
if (!cond)
{
return CELL_ESRCH;
if (!cond)
{
return CELL_ESRCH;
}
return CELL_OK;
}
return CELL_OK;
}
error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
@ -181,46 +215,80 @@ error_code sys_cond_signal_all(ppu_thread& ppu, u32 cond_id)
sys_cond.trace("sys_cond_signal_all(cond_id=0x%x)", cond_id);
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond)
while (true)
{
if (atomic_storage<ppu_thread*>::load(cond.sq))
if (ppu.test_stopped())
{
std::lock_guard lock(cond.mutex->mutex);
ppu.state += cpu_flag::again;
return {};
}
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
bool finished = true;
ppu.state += cpu_flag::wait;
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond)
{
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
if (cpu->state & cpu_flag::again)
std::lock_guard lock(cond.mutex->mutex);
if (ppu.state & cpu_flag::suspend)
{
ppu.state += cpu_flag::again;
// Test if another signal caused the current thread to be suspended, in which case it needs to wait until the thread wakes up (otherwise the signal may cause unexpected results)
finished = false;
return;
}
}
cpu_thread* result = nullptr;
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, SYS_SYNC_PRIORITY))
{
if (cond.mutex->try_own(*cpu))
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
ensure(!std::exchange(result, cpu));
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return;
}
}
cpu_thread* result = nullptr;
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, SYS_SYNC_PRIORITY))
{
if (cond.mutex->try_own(*cpu))
{
ensure(!std::exchange(result, cpu));
}
}
if (result)
{
cond.awake(result);
}
}
if (result)
else
{
cond.awake(result);
cond.mutex->mutex.lock_unlock();
if (ppu.state & cpu_flag::suspend)
{
finished = false;
}
}
});
if (!finished)
{
continue;
}
});
if (!cond)
{
return CELL_ESRCH;
if (!cond)
{
return CELL_ESRCH;
}
return CELL_OK;
}
return CELL_OK;
}
error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
@ -229,53 +297,88 @@ error_code sys_cond_signal_to(ppu_thread& ppu, u32 cond_id, u32 thread_id)
sys_cond.trace("sys_cond_signal_to(cond_id=0x%x, thread_id=0x%x)", cond_id, thread_id);
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond) -> int
while (true)
{
if (!idm::check_unlocked<named_thread<ppu_thread>>(thread_id))
if (ppu.test_stopped())
{
return -1;
ppu.state += cpu_flag::again;
return {};
}
if (atomic_storage<ppu_thread*>::load(cond.sq))
bool finished = true;
ppu.state += cpu_flag::wait;
const auto cond = idm::check<lv2_obj, lv2_cond>(cond_id, [&, notify = lv2_obj::notify_all_t()](lv2_cond& cond)
{
std::lock_guard lock(cond.mutex->mutex);
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
if (!idm::check_unlocked<named_thread<ppu_thread>>(thread_id))
{
if (cpu->id == thread_id)
return -1;
}
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
std::lock_guard lock(cond.mutex->mutex);
if (ppu.state & cpu_flag::suspend)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
// Test if another signal caused the current thread to be suspended, in which case it needs to wait until the thread wakes up (otherwise the signal may cause unexpected results)
finished = false;
return 0;
}
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
if (cpu->id == thread_id)
{
ppu.state += cpu_flag::again;
return 0;
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
ensure(cond.unqueue(cond.sq, cpu));
if (cond.mutex->try_own(*cpu))
{
cond.awake(cpu);
}
return 1;
}
ensure(cond.unqueue(cond.sq, cpu));
if (cond.mutex->try_own(*cpu))
{
cond.awake(cpu);
}
return 1;
}
}
else
{
cond.mutex->mutex.lock_unlock();
if (ppu.state & cpu_flag::suspend)
{
finished = false;
return 0;
}
}
return 0;
});
if (!finished)
{
continue;
}
return 0;
});
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
}
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
if (!cond.ret)
{
return not_an_error(CELL_EPERM);
}
return CELL_OK;
}
if (!cond.ret)
{
return not_an_error(CELL_EPERM);
}
return CELL_OK;
}
error_code sys_cond_wait(ppu_thread& ppu, u32 cond_id, u64 timeout)

View File

@ -155,64 +155,80 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
fmt::throw_exception("Unknown mode (%d)", mode);
}
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&, notify = lv2_obj::notify_all_t()](lv2_lwcond& cond) -> int
while (true)
{
ppu_thread* cpu = nullptr;
if (ppu_thread_id != u32{umax})
if (ppu.test_stopped())
{
cpu = idm::check_unlocked<named_thread<ppu_thread>>(static_cast<u32>(ppu_thread_id));
if (!cpu)
{
return -1;
}
ppu.state += cpu_flag::again;
return {};
}
lv2_lwmutex* mutex = nullptr;
bool finished = true;
if (mode != 2)
ppu.state += cpu_flag::wait;
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&, notify = lv2_obj::notify_all_t()](lv2_lwcond& cond) -> int
{
mutex = idm::check_unlocked<lv2_obj, lv2_lwmutex>(lwmutex_id);
ppu_thread* cpu = nullptr;
if (!mutex)
if (ppu_thread_id != u32{umax})
{
return -1;
}
}
cpu = idm::check_unlocked<named_thread<ppu_thread>>(static_cast<u32>(ppu_thread_id));
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
std::lock_guard lock(cond.mutex);
if (cpu)
{
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
if (!cpu)
{
ppu.state += cpu_flag::again;
return 0;
return -1;
}
}
auto result = cpu ? cond.unqueue(cond.sq, cpu) :
cond.schedule<ppu_thread>(cond.sq, cond.protocol);
lv2_lwmutex* mutex = nullptr;
if (result)
if (mode != 2)
{
if (static_cast<ppu_thread*>(result)->state & cpu_flag::again)
mutex = idm::check_unlocked<lv2_obj, lv2_lwmutex>(lwmutex_id);
if (!mutex)
{
ppu.state += cpu_flag::again;
return -1;
}
}
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
std::lock_guard lock(cond.mutex);
if (ppu.state & cpu_flag::suspend)
{
// Test if another signal caused the current thread to be suspended, in which case it needs to wait until the thread wakes up (otherwise the signal may cause unexpected results)
finished = false;
return 0;
}
if (mode == 2)
if (cpu)
{
static_cast<ppu_thread*>(result)->gpr[3] = CELL_EBUSY;
if (static_cast<ppu_thread*>(cpu)->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
}
if (mode != 2)
auto result = cpu ? cond.unqueue(cond.sq, cpu) :
cond.schedule<ppu_thread>(cond.sq, cond.protocol);
if (result)
{
if (mode == 3 && mutex->load_sq()) [[unlikely]]
if (static_cast<ppu_thread*>(result)->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
if (mode == 2)
{
static_cast<ppu_thread*>(result)->gpr[3] = CELL_EBUSY;
}
else if (mode == 3 && mutex->load_sq()) [[unlikely]]
{
std::lock_guard lock(mutex->mutex);
@ -237,43 +253,58 @@ error_code _sys_lwcond_signal(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u6
mutex->try_own(result, true);
result = nullptr;
}
}
if (result)
if (result)
{
cond.awake(result);
}
return 1;
}
}
else
{
cond.mutex.lock_unlock();
if (ppu.state & cpu_flag::suspend)
{
cond.awake(result);
finished = false;
return 0;
}
return 1;
}
}
return 0;
});
return 0;
});
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
}
if (!cond.ret)
{
if (ppu_thread_id == u32{umax})
if (!finished)
{
if (mode == 3)
{
return not_an_error(CELL_ENOENT);
}
else if (mode == 2)
{
return CELL_OK;
}
continue;
}
return not_an_error(CELL_EPERM);
}
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
}
return CELL_OK;
if (!cond.ret)
{
if (ppu_thread_id == u32{umax})
{
if (mode == 3)
{
return not_an_error(CELL_ENOENT);
}
else if (mode == 2)
{
return CELL_OK;
}
}
return not_an_error(CELL_EPERM);
}
return CELL_OK;
}
}
error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u32 mode)
@ -290,80 +321,115 @@ error_code _sys_lwcond_signal_all(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id
fmt::throw_exception("Unknown mode (%d)", mode);
}
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&, notify = lv2_obj::notify_all_t()](lv2_lwcond& cond) -> s32
while (true)
{
lv2_lwmutex* mutex{};
if (mode != 2)
if (ppu.test_stopped())
{
mutex = idm::check_unlocked<lv2_obj, lv2_lwmutex>(lwmutex_id);
if (!mutex)
{
return -1;
}
ppu.state += cpu_flag::again;
return {};
}
if (atomic_storage<ppu_thread*>::load(cond.sq))
bool finished = true;
ppu.state += cpu_flag::wait;
const auto cond = idm::check<lv2_obj, lv2_lwcond>(lwcond_id, [&, notify = lv2_obj::notify_all_t()](lv2_lwcond& cond) -> int
{
std::lock_guard lock(cond.mutex);
lv2_lwmutex* mutex{};
u32 result = 0;
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
if (mode != 2)
{
if (cpu->state & cpu_flag::again)
mutex = idm::check_unlocked<lv2_obj, lv2_lwmutex>(lwmutex_id);
if (!mutex)
{
ppu.state += cpu_flag::again;
return -1;
}
}
if (atomic_storage<ppu_thread*>::load(cond.sq))
{
std::lock_guard lock(cond.mutex);
if (ppu.state & cpu_flag::suspend)
{
// Test if another signal caused the current thread to be suspended, in which case it needs to wait until the thread wakes up (otherwise the signal may cause unexpected results)
finished = false;
return 0;
}
u32 result = 0;
for (auto cpu = +cond.sq; cpu; cpu = cpu->next_cpu)
{
if (cpu->state & cpu_flag::again)
{
ppu.state += cpu_flag::again;
return 0;
}
}
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
while (const auto cpu = cond.schedule<ppu_thread>(sq, cond.protocol))
{
if (mode == 2)
{
static_cast<ppu_thread*>(cpu)->gpr[3] = CELL_EBUSY;
}
if (mode == 1)
{
mutex->try_own(cpu, true);
}
else
{
lv2_obj::append(cpu);
}
result++;
}
if (result && mode == 2)
{
lv2_obj::awake_all();
}
return result;
}
else
{
cond.mutex.lock_unlock();
if (ppu.state & cpu_flag::suspend)
{
finished = false;
return 0;
}
}
auto sq = cond.sq;
atomic_storage<ppu_thread*>::release(cond.sq, nullptr);
return 0;
});
while (const auto cpu = cond.schedule<ppu_thread>(sq, cond.protocol))
{
if (mode == 2)
{
static_cast<ppu_thread*>(cpu)->gpr[3] = CELL_EBUSY;
}
if (mode == 1)
{
mutex->try_own(cpu, true);
}
else
{
lv2_obj::append(cpu);
}
result++;
}
if (result && mode == 2)
{
lv2_obj::awake_all();
}
return result;
if (!finished)
{
continue;
}
return 0;
});
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
}
if (!cond || cond.ret == -1)
{
return CELL_ESRCH;
if (mode == 1)
{
// Mode 1: return the amount of threads (TODO)
return not_an_error(cond.ret);
}
return CELL_OK;
}
if (mode == 1)
{
// Mode 1: return the amount of threads (TODO)
return not_an_error(cond.ret);
}
return CELL_OK;
}
error_code _sys_lwcond_queue_wait(ppu_thread& ppu, u32 lwcond_id, u32 lwmutex_id, u64 timeout)

View File

@ -338,29 +338,60 @@ error_code sys_ppu_thread_get_priority(ppu_thread& ppu, u32 thread_id, vm::ptr<s
ppu.state += cpu_flag::wait;
sys_ppu_thread.trace("sys_ppu_thread_get_priority(thread_id=0x%x, priop=*0x%x)", thread_id, priop);
u32 prio{};
if (thread_id == ppu.id)
{
// Fast path for self
for (; !ppu.is_stopped(); std::this_thread::yield())
{
if (reader_lock lock(lv2_obj::g_mutex); cpu_flag::suspend - ppu.state)
{
prio = ppu.prio.load().prio;
break;
}
ppu.check_state();
ppu.state += cpu_flag::wait;
}
ppu.check_state();
*priop = ppu.prio.load().prio;
*priop = prio;
return CELL_OK;
}
u32 prio{};
const auto thread = idm::check<named_thread<ppu_thread>>(thread_id, [&](ppu_thread& thread)
for (; !ppu.is_stopped(); std::this_thread::yield())
{
prio = thread.prio.load().prio;
});
bool check_state = false;
const auto thread = idm::check<named_thread<ppu_thread>>(thread_id, [&](ppu_thread& thread)
{
if (reader_lock lock(lv2_obj::g_mutex); cpu_flag::suspend - ppu.state)
{
prio = thread.prio.load().prio;
}
else
{
check_state = true;
}
});
if (!thread)
{
return CELL_ESRCH;
if (check_state)
{
ppu.check_state();
ppu.state += cpu_flag::wait;
continue;
}
if (!thread)
{
return CELL_ESRCH;
}
ppu.check_state();
*priop = prio;
break;
}
ppu.check_state();
*priop = prio;
return CELL_OK;
}