mirror of
https://gitlab.com/OpenMW/openmw.git
synced 2025-02-20 06:40:09 +00:00
Store jobs in the same container until they are processed
Push to queue and reorder only iterators.
This commit is contained in:
parent
8db640289c
commit
902b0f9f84
@ -20,6 +20,8 @@ namespace
|
||||
using DetourNavigator::TilePosition;
|
||||
using DetourNavigator::UpdateType;
|
||||
using DetourNavigator::ChangeType;
|
||||
using DetourNavigator::Job;
|
||||
using DetourNavigator::JobIt;
|
||||
|
||||
int getManhattanDistance(const TilePosition& lhs, const TilePosition& rhs)
|
||||
{
|
||||
@ -44,6 +46,25 @@ namespace
|
||||
return UpdateType::Temporary;
|
||||
return UpdateType::Persistent;
|
||||
}
|
||||
|
||||
auto getPriority(const Job& job) noexcept
|
||||
{
|
||||
return std::make_tuple(job.mProcessTime, job.mTryNumber, job.mChangeType, job.mDistanceToPlayer, job.mDistanceToOrigin);
|
||||
}
|
||||
|
||||
struct LessByJobPriority
|
||||
{
|
||||
bool operator()(JobIt lhs, JobIt rhs) const noexcept
|
||||
{
|
||||
return getPriority(*lhs) < getPriority(*rhs);
|
||||
}
|
||||
};
|
||||
|
||||
void insertPrioritizedJob(JobIt job, std::deque<JobIt>& queue)
|
||||
{
|
||||
const auto it = std::upper_bound(queue.begin(), queue.end(), job, LessByJobPriority {});
|
||||
queue.insert(it, job);
|
||||
}
|
||||
}
|
||||
|
||||
namespace DetourNavigator
|
||||
@ -64,7 +85,8 @@ namespace DetourNavigator
|
||||
{
|
||||
mShouldStop = true;
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
mJobs = decltype(mJobs)();
|
||||
mThreadsQueues.clear();
|
||||
mWaiting.clear();
|
||||
mHasJob.notify_all();
|
||||
lock.unlock();
|
||||
for (auto& thread : mThreads)
|
||||
@ -88,8 +110,8 @@ namespace DetourNavigator
|
||||
const std::lock_guard<std::mutex> lock(mMutex);
|
||||
|
||||
if (playerTileChanged)
|
||||
for (auto& job : mJobs)
|
||||
job.mDistanceToPlayer = getManhattanDistance(job.mChangedTile, playerTile);
|
||||
for (JobIt job : mWaiting)
|
||||
job->mDistanceToPlayer = getManhattanDistance(job->mChangedTile, playerTile);
|
||||
|
||||
for (const auto& changedTile : changedTiles)
|
||||
{
|
||||
@ -108,24 +130,21 @@ namespace DetourNavigator
|
||||
? mLastUpdates[job.mAgentHalfExtents][job.mChangedTile] + mSettings.get().mMinUpdateInterval
|
||||
: std::chrono::steady_clock::time_point();
|
||||
|
||||
const JobIt it = mJobs.insert(mJobs.end(), std::move(job));
|
||||
|
||||
if (playerTileChanged)
|
||||
{
|
||||
mJobs.push_back(std::move(job));
|
||||
}
|
||||
mWaiting.push_back(it);
|
||||
else
|
||||
{
|
||||
const auto it = std::upper_bound(mJobs.begin(), mJobs.end(), job);
|
||||
mJobs.insert(it, std::move(job));
|
||||
}
|
||||
insertPrioritizedJob(it, mWaiting);
|
||||
}
|
||||
}
|
||||
|
||||
if (playerTileChanged)
|
||||
std::sort(mJobs.begin(), mJobs.end());
|
||||
std::sort(mWaiting.begin(), mWaiting.end(), LessByJobPriority {});
|
||||
|
||||
Log(Debug::Debug) << "Posted " << mJobs.size() << " navigator jobs";
|
||||
|
||||
if (!mJobs.empty())
|
||||
if (!mWaiting.empty())
|
||||
mHasJob.notify_all();
|
||||
}
|
||||
|
||||
@ -166,7 +185,7 @@ namespace DetourNavigator
|
||||
int minDistanceToPlayer = 0;
|
||||
const auto isDone = [&]
|
||||
{
|
||||
jobsLeft = mJobs.size() + getTotalThreadJobsUnsafe();
|
||||
jobsLeft = mJobs.size();
|
||||
if (jobsLeft == 0)
|
||||
{
|
||||
minDistanceToPlayer = 0;
|
||||
@ -199,7 +218,7 @@ namespace DetourNavigator
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
mDone.wait(lock, [this] { return mJobs.size() + getTotalThreadJobsUnsafe() == 0; });
|
||||
mDone.wait(lock, [this] { return mJobs.size() == 0; });
|
||||
}
|
||||
mProcessingTiles.wait(mProcessed, [] (const auto& v) { return v.empty(); });
|
||||
}
|
||||
@ -210,7 +229,7 @@ namespace DetourNavigator
|
||||
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock(mMutex);
|
||||
jobs = mJobs.size() + getTotalThreadJobsUnsafe();
|
||||
jobs = mJobs.size();
|
||||
}
|
||||
|
||||
stats.setAttribute(frameNumber, "NavMesh UpdateJobs", jobs);
|
||||
@ -226,12 +245,14 @@ namespace DetourNavigator
|
||||
{
|
||||
try
|
||||
{
|
||||
if (auto job = getNextJob())
|
||||
if (JobIt job = getNextJob(); job != mJobs.end())
|
||||
{
|
||||
const auto processed = processJob(*job);
|
||||
unlockTile(job->mAgentHalfExtents, job->mChangedTile);
|
||||
if (!processed)
|
||||
repost(std::move(*job));
|
||||
if (processed)
|
||||
removeJob(job);
|
||||
else
|
||||
repost(job);
|
||||
}
|
||||
else
|
||||
cleanupLastUpdates();
|
||||
@ -306,7 +327,7 @@ namespace DetourNavigator
|
||||
return isSuccess(status);
|
||||
}
|
||||
|
||||
std::optional<AsyncNavMeshUpdater::Job> AsyncNavMeshUpdater::getNextJob()
|
||||
JobIt AsyncNavMeshUpdater::getNextJob()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
|
||||
@ -316,25 +337,25 @@ namespace DetourNavigator
|
||||
while (true)
|
||||
{
|
||||
const auto hasJob = [&] {
|
||||
return (!mJobs.empty() && mJobs.front().mProcessTime <= std::chrono::steady_clock::now())
|
||||
return (!mWaiting.empty() && mWaiting.front()->mProcessTime <= std::chrono::steady_clock::now())
|
||||
|| !threadQueue.empty();
|
||||
};
|
||||
|
||||
if (!mHasJob.wait_for(lock, std::chrono::milliseconds(10), hasJob))
|
||||
{
|
||||
if (mJobs.empty() && getTotalThreadJobsUnsafe() == 0)
|
||||
if (mJobs.empty())
|
||||
mDone.notify_all();
|
||||
return std::nullopt;
|
||||
return mJobs.end();
|
||||
}
|
||||
|
||||
Log(Debug::Debug) << "Got " << mJobs.size() << " navigator jobs and "
|
||||
<< threadQueue.size() << " thread jobs by thread=" << std::this_thread::get_id();
|
||||
|
||||
auto job = threadQueue.empty()
|
||||
? getJob(mJobs, true)
|
||||
const JobIt job = threadQueue.empty()
|
||||
? getJob(mWaiting, true)
|
||||
: getJob(threadQueue, false);
|
||||
|
||||
if (!job)
|
||||
if (job == mJobs.end())
|
||||
continue;
|
||||
|
||||
const auto owner = lockTile(job->mAgentHalfExtents, job->mChangedTile);
|
||||
@ -351,22 +372,22 @@ namespace DetourNavigator
|
||||
return job;
|
||||
}
|
||||
|
||||
postThreadJob(std::move(*job), mThreadsQueues[owner]);
|
||||
postThreadJob(job, mThreadsQueues[owner]);
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<AsyncNavMeshUpdater::Job> AsyncNavMeshUpdater::getJob(Jobs& jobs, bool changeLastUpdate)
|
||||
JobIt AsyncNavMeshUpdater::getJob(std::deque<JobIt>& jobs, bool changeLastUpdate)
|
||||
{
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
JobIt job = jobs.front();
|
||||
|
||||
if (jobs.front().mProcessTime > now)
|
||||
return {};
|
||||
if (job->mProcessTime > now)
|
||||
return mJobs.end();
|
||||
|
||||
Job job = jobs.front();
|
||||
jobs.pop_front();
|
||||
|
||||
if (changeLastUpdate && job.mChangeType == ChangeType::update)
|
||||
mLastUpdates[job.mAgentHalfExtents][job.mChangedTile] = now;
|
||||
if (changeLastUpdate && job->mChangeType == ChangeType::update)
|
||||
mLastUpdates[job->mAgentHalfExtents][job->mChangedTile] = now;
|
||||
|
||||
return job;
|
||||
}
|
||||
@ -394,24 +415,27 @@ namespace DetourNavigator
|
||||
writeToFile(shared->lockConst()->getImpl(), mSettings.get().mNavMeshPathPrefix, navMeshRevision);
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::repost(Job&& job)
|
||||
void AsyncNavMeshUpdater::repost(JobIt job)
|
||||
{
|
||||
if (mShouldStop || job.mTryNumber > 2)
|
||||
if (mShouldStop || job->mTryNumber > 2)
|
||||
return;
|
||||
|
||||
const std::lock_guard<std::mutex> lock(mMutex);
|
||||
|
||||
if (mPushed[job.mAgentHalfExtents].insert(job.mChangedTile).second)
|
||||
if (mPushed[job->mAgentHalfExtents].insert(job->mChangedTile).second)
|
||||
{
|
||||
++job.mTryNumber;
|
||||
mJobs.push_back(std::move(job));
|
||||
++job->mTryNumber;
|
||||
mWaiting.push_back(job);
|
||||
mHasJob.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
mJobs.erase(job);
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::postThreadJob(Job&& job, Jobs& queue)
|
||||
void AsyncNavMeshUpdater::postThreadJob(JobIt job, std::deque<JobIt>& queue)
|
||||
{
|
||||
queue.push_back(std::move(job));
|
||||
queue.push_back(job);
|
||||
mHasJob.notify_all();
|
||||
}
|
||||
|
||||
@ -468,13 +492,7 @@ namespace DetourNavigator
|
||||
std::size_t AsyncNavMeshUpdater::getTotalJobs() const
|
||||
{
|
||||
const std::scoped_lock lock(mMutex);
|
||||
return mJobs.size() + getTotalThreadJobsUnsafe();
|
||||
}
|
||||
|
||||
std::size_t AsyncNavMeshUpdater::getTotalThreadJobsUnsafe() const
|
||||
{
|
||||
return std::accumulate(mThreadsQueues.begin(), mThreadsQueues.end(), std::size_t(0),
|
||||
[] (auto r, const auto& v) { return r + v.second.size(); });
|
||||
return mJobs.size();
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::cleanupLastUpdates()
|
||||
@ -499,4 +517,10 @@ namespace DetourNavigator
|
||||
++agent;
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncNavMeshUpdater::removeJob(JobIt job)
|
||||
{
|
||||
const std::lock_guard lock(mMutex);
|
||||
mJobs.erase(job);
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include <set>
|
||||
#include <thread>
|
||||
#include <tuple>
|
||||
#include <list>
|
||||
|
||||
class dtNavMesh;
|
||||
|
||||
@ -52,6 +53,20 @@ namespace DetourNavigator
|
||||
return stream << "ChangeType::" << static_cast<int>(value);
|
||||
}
|
||||
|
||||
struct Job
|
||||
{
|
||||
osg::Vec3f mAgentHalfExtents;
|
||||
std::weak_ptr<GuardedNavMeshCacheItem> mNavMeshCacheItem;
|
||||
TilePosition mChangedTile;
|
||||
unsigned mTryNumber;
|
||||
ChangeType mChangeType;
|
||||
int mDistanceToPlayer;
|
||||
int mDistanceToOrigin;
|
||||
std::chrono::steady_clock::time_point mProcessTime;
|
||||
};
|
||||
|
||||
using JobIt = std::list<Job>::iterator;
|
||||
|
||||
class AsyncNavMeshUpdater
|
||||
{
|
||||
public:
|
||||
@ -67,31 +82,6 @@ namespace DetourNavigator
|
||||
void reportStats(unsigned int frameNumber, osg::Stats& stats) const;
|
||||
|
||||
private:
|
||||
struct Job
|
||||
{
|
||||
osg::Vec3f mAgentHalfExtents;
|
||||
std::weak_ptr<GuardedNavMeshCacheItem> mNavMeshCacheItem;
|
||||
TilePosition mChangedTile;
|
||||
unsigned mTryNumber;
|
||||
ChangeType mChangeType;
|
||||
int mDistanceToPlayer;
|
||||
int mDistanceToOrigin;
|
||||
std::chrono::steady_clock::time_point mProcessTime;
|
||||
|
||||
std::tuple<std::chrono::steady_clock::time_point, unsigned, ChangeType, int, int> getPriority() const
|
||||
{
|
||||
return std::make_tuple(mProcessTime, mTryNumber, mChangeType, mDistanceToPlayer, mDistanceToOrigin);
|
||||
}
|
||||
|
||||
friend inline bool operator <(const Job& lhs, const Job& rhs)
|
||||
{
|
||||
return lhs.getPriority() < rhs.getPriority();
|
||||
}
|
||||
};
|
||||
|
||||
using Jobs = std::deque<Job>;
|
||||
using Pushed = std::map<osg::Vec3f, std::set<TilePosition>>;
|
||||
|
||||
std::reference_wrapper<const Settings> mSettings;
|
||||
std::reference_wrapper<TileCachedRecastMeshManager> mRecastMeshManager;
|
||||
std::reference_wrapper<OffMeshConnectionsManager> mOffMeshConnectionsManager;
|
||||
@ -100,29 +90,30 @@ namespace DetourNavigator
|
||||
std::condition_variable mHasJob;
|
||||
std::condition_variable mDone;
|
||||
std::condition_variable mProcessed;
|
||||
Jobs mJobs;
|
||||
std::list<Job> mJobs;
|
||||
std::deque<JobIt> mWaiting;
|
||||
std::map<osg::Vec3f, std::set<TilePosition>> mPushed;
|
||||
Misc::ScopeGuarded<TilePosition> mPlayerTile;
|
||||
NavMeshTilesCache mNavMeshTilesCache;
|
||||
Misc::ScopeGuarded<std::map<osg::Vec3f, std::map<TilePosition, std::thread::id>>> mProcessingTiles;
|
||||
std::map<osg::Vec3f, std::map<TilePosition, std::chrono::steady_clock::time_point>> mLastUpdates;
|
||||
std::set<std::tuple<osg::Vec3f, TilePosition>> mPresentTiles;
|
||||
std::map<std::thread::id, Jobs> mThreadsQueues;
|
||||
std::map<std::thread::id, std::deque<JobIt>> mThreadsQueues;
|
||||
std::vector<std::thread> mThreads;
|
||||
|
||||
void process() noexcept;
|
||||
|
||||
bool processJob(const Job& job);
|
||||
|
||||
std::optional<Job> getNextJob();
|
||||
JobIt getNextJob();
|
||||
|
||||
std::optional<Job> getJob(Jobs& jobs, bool changeLastUpdate);
|
||||
JobIt getJob(std::deque<JobIt>& jobs, bool changeLastUpdate);
|
||||
|
||||
void postThreadJob(Job&& job, Jobs& queue);
|
||||
void postThreadJob(JobIt job, std::deque<JobIt>& queue);
|
||||
|
||||
void writeDebugFiles(const Job& job, const RecastMesh* recastMesh) const;
|
||||
|
||||
void repost(Job&& job);
|
||||
void repost(JobIt job);
|
||||
|
||||
std::thread::id lockTile(const osg::Vec3f& agentHalfExtents, const TilePosition& changedTile);
|
||||
|
||||
@ -130,13 +121,13 @@ namespace DetourNavigator
|
||||
|
||||
inline std::size_t getTotalJobs() const;
|
||||
|
||||
inline std::size_t getTotalThreadJobsUnsafe() const;
|
||||
|
||||
void cleanupLastUpdates();
|
||||
|
||||
int waitUntilJobsDoneForNotPresentTiles(const std::size_t initialJobsLeft, std::size_t& maxJobsLeft, Loading::Listener& listener);
|
||||
|
||||
void waitUntilAllJobsDone();
|
||||
|
||||
inline void removeJob(JobIt job);
|
||||
};
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user