#ifndef KITTY_THREAD_POOL_H #define KITTY_THREAD_POOL_H #include #include "task_pool.h" namespace util { /* * Allow threads to execute unhindered * while keeping full controll over the threads. */ class ThreadPool : public TaskPool { public: typedef TaskPool::__task __task; private: std::vector _thread; std::condition_variable _cv; std::mutex _lock; bool _continue; public: ThreadPool() : _continue { false } {} explicit ThreadPool(int threads) : _thread(threads), _continue { true } { for (auto &t : _thread) { t = std::thread(&ThreadPool::_main, this); } } ~ThreadPool() noexcept { if (!_continue) return; stop(); join(); } template auto push(Function && newTask, Args &&... args) { std::lock_guard lg(_lock); auto future = TaskPool::push(std::forward(newTask), std::forward(args)...); _cv.notify_one(); return future; } void pushDelayed(std::pair<__time_point, __task> &&task) { std::lock_guard lg(_lock); TaskPool::pushDelayed(std::move(task)); } template auto pushDelayed(Function &&newTask, std::chrono::duration duration, Args &&... args) { std::lock_guard lg(_lock); auto future = TaskPool::pushDelayed(std::forward(newTask), duration, std::forward(args)...); // Update all timers for wait_until _cv.notify_all(); return future; } void start(int threads) { _continue = true; _thread.resize(threads); for(auto &t : _thread) { t = std::thread(&ThreadPool::_main, this); } } void stop() { std::lock_guard lg(_lock); _continue = false; _cv.notify_all(); } void join() { for (auto & t : _thread) { t.join(); } } public: void _main() { while (_continue) { if(auto task = this->pop()) { (*task)->run(); } else { std::unique_lock uniq_lock(_lock); if(ready()) { continue; } if(!_continue) { break; } if(auto tp = next()) { _cv.wait_until(uniq_lock, *tp); } else { _cv.wait(uniq_lock); } } } // Execute remaining tasks while(auto task = this->pop()) { (*task)->run(); } } }; } #endif