mirror of
https://gitlab.com/OpenMW/openmw.git
synced 2025-02-20 15:40:32 +00:00
Remove redundant job distribution between threads
Instead don't take jobs from queue until job for the same tile is processing.
This commit is contained in:
parent
77e58abf0d
commit
431501e23a
@ -102,7 +102,6 @@ namespace DetourNavigator
|
||||
{
|
||||
mShouldStop = true;
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
mThreadsQueues.clear();
|
||||
mWaiting.clear();
|
||||
mHasJob.notify_all();
|
||||
lock.unlock();
|
||||
@ -340,64 +339,38 @@ namespace DetourNavigator
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
|
||||
const auto threadId = std::this_thread::get_id();
|
||||
auto& threadQueue = mThreadsQueues[threadId];
|
||||
|
||||
while (true)
|
||||
bool shouldStop = false;
|
||||
const auto hasJob = [&]
|
||||
{
|
||||
bool shouldStop = false;
|
||||
shouldStop = mShouldStop;
|
||||
return shouldStop
|
||||
|| (!mWaiting.empty() && mWaiting.front()->mProcessTime <= std::chrono::steady_clock::now());
|
||||
};
|
||||
|
||||
const auto hasJob = [&] {
|
||||
shouldStop = mShouldStop;
|
||||
return shouldStop
|
||||
|| (!mWaiting.empty() && mWaiting.front()->mProcessTime <= std::chrono::steady_clock::now())
|
||||
|| !threadQueue.empty();
|
||||
};
|
||||
|
||||
if (!mHasJob.wait_for(lock, std::chrono::milliseconds(10), hasJob))
|
||||
{
|
||||
if (mJobs.empty())
|
||||
mDone.notify_all();
|
||||
return mJobs.end();
|
||||
}
|
||||
|
||||
if (shouldStop)
|
||||
return mJobs.end();
|
||||
|
||||
Log(Debug::Debug) << "Got " << mJobs.size() << " navigator jobs and "
|
||||
<< threadQueue.size() << " thread jobs by thread=" << std::this_thread::get_id();
|
||||
|
||||
const JobIt job = threadQueue.empty()
|
||||
? getJob(mWaiting, true)
|
||||
: getJob(threadQueue, false);
|
||||
|
||||
if (job == mJobs.end())
|
||||
continue;
|
||||
|
||||
const auto owner = lockTile(job->mAgentHalfExtents, job->mChangedTile);
|
||||
|
||||
if (owner == threadId)
|
||||
{
|
||||
mPushed.erase(getAgentAndTile(*job));
|
||||
return job;
|
||||
}
|
||||
|
||||
postThreadJob(job, mThreadsQueues[owner]);
|
||||
if (!mHasJob.wait_for(lock, std::chrono::milliseconds(10), hasJob))
|
||||
{
|
||||
if (mJobs.empty())
|
||||
mDone.notify_all();
|
||||
return mJobs.end();
|
||||
}
|
||||
}
|
||||
|
||||
JobIt AsyncNavMeshUpdater::getJob(std::deque<JobIt>& jobs, bool changeLastUpdate)
|
||||
{
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
JobIt job = jobs.front();
|
||||
|
||||
if (job->mProcessTime > now)
|
||||
if (shouldStop)
|
||||
return mJobs.end();
|
||||
|
||||
jobs.pop_front();
|
||||
const JobIt job = mWaiting.front();
|
||||
|
||||
if (changeLastUpdate && job->mChangeType == ChangeType::update)
|
||||
mLastUpdates[getAgentAndTile(*job)] = now;
|
||||
mWaiting.pop_front();
|
||||
|
||||
if (!lockTile(job->mAgentHalfExtents, job->mChangedTile))
|
||||
{
|
||||
++job->mTryNumber;
|
||||
insertPrioritizedJob(job, mWaiting);
|
||||
return mJobs.end();
|
||||
}
|
||||
|
||||
if (job->mChangeType == ChangeType::update)
|
||||
mLastUpdates[getAgentAndTile(*job)] = std::chrono::steady_clock::now();
|
||||
mPushed.erase(getAgentAndTile(*job));
|
||||
|
||||
return job;
|
||||
}
|
||||
@ -435,7 +408,7 @@ namespace DetourNavigator
|
||||
if (mPushed.emplace(job->mAgentHalfExtents, job->mChangedTile).second)
|
||||
{
|
||||
++job->mTryNumber;
|
||||
mWaiting.push_back(job);
|
||||
insertPrioritizedJob(job, mWaiting);
|
||||
mHasJob.notify_all();
|
||||
return;
|
||||
}
|
||||
@ -443,26 +416,11 @@ namespace DetourNavigator
|
||||
mJobs.erase(job);
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::postThreadJob(JobIt job, std::deque<JobIt>& queue)
|
||||
{
|
||||
queue.push_back(job);
|
||||
mHasJob.notify_all();
|
||||
}
|
||||
|
||||
std::thread::id AsyncNavMeshUpdater::lockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile)
|
||||
bool AsyncNavMeshUpdater::lockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile)
|
||||
{
|
||||
if (mSettings.get().mAsyncNavMeshUpdaterThreads <= 1)
|
||||
return std::this_thread::get_id();
|
||||
|
||||
auto locked = mProcessingTiles.lock();
|
||||
const auto tile = locked->find(std::make_tuple(agentHalfExtents, changedTile));
|
||||
if (tile == locked->end())
|
||||
{
|
||||
const auto threadId = std::this_thread::get_id();
|
||||
locked->emplace(std::tie(agentHalfExtents, changedTile), threadId);
|
||||
return threadId;
|
||||
}
|
||||
return tile->second;
|
||||
return true;
|
||||
return mProcessingTiles.lock()->emplace(agentHalfExtents, changedTile).second;
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::unlockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile)
|
||||
|
@ -99,10 +99,9 @@ namespace DetourNavigator
|
||||
std::set<std::tuple<osg::Vec3f, TilePosition>> mPushed;
|
||||
Misc::ScopeGuarded<TilePosition> mPlayerTile;
|
||||
NavMeshTilesCache mNavMeshTilesCache;
|
||||
Misc::ScopeGuarded<std::map<std::tuple<osg::Vec3f, TilePosition>, std::thread::id>> mProcessingTiles;
|
||||
Misc::ScopeGuarded<std::set<std::tuple<osg::Vec3f, TilePosition>>> mProcessingTiles;
|
||||
std::map<std::tuple<osg::Vec3f, TilePosition>, std::chrono::steady_clock::time_point> mLastUpdates;
|
||||
std::set<std::tuple<osg::Vec3f, TilePosition>> mPresentTiles;
|
||||
std::map<std::thread::id, std::deque<JobIt>> mThreadsQueues;
|
||||
std::vector<std::thread> mThreads;
|
||||
|
||||
void process() noexcept;
|
||||
@ -119,7 +118,7 @@ namespace DetourNavigator
|
||||
|
||||
void repost(JobIt job);
|
||||
|
||||
std::thread::id lockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile);
|
||||
bool lockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile);
|
||||
|
||||
void unlockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user