From b7e69cbc64fafd1f8ae255c4425f0e3b5cdcdd6e Mon Sep 17 00:00:00 2001 From: scrawl Date: Sat, 6 Feb 2016 22:29:06 +0100 Subject: [PATCH] Refactor WorkQueue, merge WorkTicket and WorkItem Allow the caller to hold on to the WorkItem. This makes it possible for a derived WorkItem to store the result of the work within the WorkItem itself. --- components/sceneutil/workqueue.cpp | 39 ++++++------- components/sceneutil/workqueue.hpp | 89 ++++++++++++++---------------- 2 files changed, 59 insertions(+), 69 deletions(-) diff --git a/components/sceneutil/workqueue.cpp b/components/sceneutil/workqueue.cpp index 26a392be45..bb1d1f1f70 100644 --- a/components/sceneutil/workqueue.cpp +++ b/components/sceneutil/workqueue.cpp @@ -1,9 +1,11 @@ #include "workqueue.hpp" +#include + namespace SceneUtil { -void WorkTicket::waitTillDone() +void WorkItem::waitTillDone() { if (mDone > 0) return; @@ -15,7 +17,7 @@ void WorkTicket::waitTillDone() } } -void WorkTicket::signalDone() +void WorkItem::signalDone() { { OpenThreads::ScopedLock lock(mMutex); @@ -25,23 +27,16 @@ void WorkTicket::signalDone() } WorkItem::WorkItem() - : mTicket(new WorkTicket) { - mTicket->setThreadSafeRefUnref(true); } WorkItem::~WorkItem() { } -void WorkItem::doWork() +bool WorkItem::isDone() const { - mTicket->signalDone(); -} - -osg::ref_ptr WorkItem::getTicket() -{ - return mTicket; + return (mDone > 0); } WorkQueue::WorkQueue(int workerThreads) @@ -60,11 +55,7 @@ WorkQueue::~WorkQueue() { OpenThreads::ScopedLock lock(mMutex); while (!mQueue.empty()) - { - WorkItem* item = mQueue.front(); - delete item; mQueue.pop(); - } mIsReleased = true; mCondition.broadcast(); } @@ -76,16 +67,20 @@ WorkQueue::~WorkQueue() } } -osg::ref_ptr WorkQueue::addWorkItem(WorkItem *item) +void WorkQueue::addWorkItem(osg::ref_ptr item) { - osg::ref_ptr ticket = item->getTicket(); + if (item->isDone()) + { + std::cerr << "warning, trying to add a work item that is already completed" << std::endl; + return; + } + OpenThreads::ScopedLock lock(mMutex); mQueue.push(item); mCondition.signal(); - return ticket; } -WorkItem *WorkQueue::removeWorkItem() +osg::ref_ptr WorkQueue::removeWorkItem() { OpenThreads::ScopedLock lock(mMutex); while (mQueue.empty() && !mIsReleased) @@ -94,7 +89,7 @@ WorkItem *WorkQueue::removeWorkItem() } if (mQueue.size()) { - WorkItem* item = mQueue.front(); + osg::ref_ptr item = mQueue.front(); mQueue.pop(); return item; } @@ -111,11 +106,11 @@ void WorkThread::run() { while (true) { - WorkItem* item = mWorkQueue->removeWorkItem(); + osg::ref_ptr item = mWorkQueue->removeWorkItem(); if (!item) return; item->doWork(); - delete item; + item->signalDone(); } } diff --git a/components/sceneutil/workqueue.hpp b/components/sceneutil/workqueue.hpp index 492bbd090e..06489dfbbc 100644 --- a/components/sceneutil/workqueue.hpp +++ b/components/sceneutil/workqueue.hpp @@ -14,37 +14,61 @@ namespace SceneUtil { - class WorkTicket : public osg::Referenced - { - public: - void waitTillDone(); - - void signalDone(); - - private: - OpenThreads::Atomic mDone; - OpenThreads::Mutex mMutex; - OpenThreads::Condition mCondition; - }; - - class WorkItem + class WorkItem : public osg::Referenced { public: WorkItem(); virtual ~WorkItem(); /// Override in a derived WorkItem to perform actual work. - /// By default, just signals the ticket that the work is done. - virtual void doWork(); + virtual void doWork() {} - osg::ref_ptr getTicket(); + bool isDone() const; + + /// Wait until the work is completed. Usually called from the main thread. + void waitTillDone(); + + /// Internal use by the WorkQueue. + void signalDone(); protected: - osg::ref_ptr mTicket; + OpenThreads::Atomic mDone; + OpenThreads::Mutex mMutex; + OpenThreads::Condition mCondition; }; class WorkQueue; + class WorkThread; + /// @brief A work queue that users can push work items onto, to be completed by one or more background threads. + /// @note Work items will be processed in the order that they were given in, however + /// if multiple work threads are involved then it is possible for a later item to complete before earlier items. + class WorkQueue : public osg::Referenced + { + public: + WorkQueue(int numWorkerThreads=1); + ~WorkQueue(); + + /// Add a new work item to the back of the queue. + /// @par The work item's waitTillDone() method may be used by the caller to wait until the work is complete. + void addWorkItem(osg::ref_ptr item); + + /// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added. + /// If the workqueue is in the process of being destroyed, may return NULL. + /// @par Used internally by the WorkThread. + osg::ref_ptr removeWorkItem(); + + private: + bool mIsReleased; + std::queue > mQueue; + + OpenThreads::Mutex mMutex; + OpenThreads::Condition mCondition; + + std::vector mThreads; + }; + + /// Internally used by WorkQueue. class WorkThread : public OpenThreads::Thread { public: @@ -56,35 +80,6 @@ namespace SceneUtil WorkQueue* mWorkQueue; }; - /// @brief A work queue that users can push work items onto, to be completed by one or more background threads. - class WorkQueue - { - public: - WorkQueue(int numWorkerThreads=1); - ~WorkQueue(); - - /// Add a new work item to the back of the queue. - /// @par The returned WorkTicket may be used by the caller to wait until the work is complete. - osg::ref_ptr addWorkItem(WorkItem* item); - - /// Get the next work item from the front of the queue. If the queue is empty, waits until a new item is added. - /// If the workqueue is in the process of being destroyed, may return NULL. - /// @note The caller must free the returned WorkItem - WorkItem* removeWorkItem(); - - void runThread(); - - private: - bool mIsReleased; - std::queue mQueue; - - OpenThreads::Mutex mMutex; - OpenThreads::Condition mCondition; - - std::vector mThreads; - }; - - }