Add ThreadGroup (derived from boost::thread_group), and remove

remaining boost-specific multi-threading usage.
This commit is contained in:
casey langen 2022-12-04 11:32:28 -08:00
parent 8d703e9737
commit a2b958ba48
5 changed files with 177 additions and 24 deletions

View File

@ -53,14 +53,12 @@
#include <musikcore/sdk/IAnalyzer.h>
#include <musikcore/sdk/IIndexerSource.h>
#include <musikcore/audio/Stream.h>
#include <musikcore/support/ThreadGroup.h>
#include <filesystem>
#include <algorithm>
#include <atomic>
#pragma warning(push, 0)
#include <boost/bind.hpp>
#pragma warning(pop)
#include <functional>
#define STRESS_TEST_DB 0
@ -85,7 +83,7 @@ using namespace musik::core::library;
using namespace musik::core::db;
using namespace musik::core::library::query;
using Thread = std::unique_ptr<boost::thread>;
using Thread = std::unique_ptr<std::thread>;
using TagReaderDestroyer = PluginFactory::ReleaseDeleter<ITagReader>;
using DecoderDeleter = PluginFactory::ReleaseDeleter<IDecoderFactory>;
@ -151,7 +149,7 @@ Indexer::~Indexer() {
void Indexer::Shutdown() {
if (this->thread) {
{
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
this->syncQueue.clear();
this->state = StateStopping;
@ -172,7 +170,7 @@ void Indexer::Schedule(SyncType type) {
}
void Indexer::Schedule(SyncType type, IIndexerSource* source) {
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
if (!this->thread) {
this->state = StateIdle;
@ -200,7 +198,7 @@ void Indexer::AddPath(const std::string& path) {
context.path = NormalizeDir(path);
{
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
if (std::find(this->paths.begin(), this->paths.end(), path) == this->paths.end()) {
this->paths.push_back(path);
@ -216,7 +214,7 @@ void Indexer::RemovePath(const std::string& path) {
context.path = NormalizeDir(path);
{
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
auto it = std::find(this->paths.begin(), this->paths.end(), path);
if (it != this->paths.end()) {
@ -492,7 +490,7 @@ void Indexer::SyncDirectory(
for (auto it : this->tagReaders) {
if (it->CanRead(extension.c_str())) {
if (io) {
io->post(boost::bind(
io->post(std::bind(
&Indexer::ReadMetadataFromFile,
this,
io,
@ -604,7 +602,7 @@ void Indexer::ThreadLoop() {
while (true) {
/* wait for some work. */
{
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
while (!this->Bail() && this->syncQueue.size() == 0) {
this->state = StateIdle;
this->waitCondition.wait(lock);
@ -629,12 +627,14 @@ void Indexer::ThreadLoop() {
if (threadCount > 1) {
boost::asio::io_service io;
boost::thread_group threadPool;
boost::asio::io_service::work work(io);
ThreadGroup threadGroup;
/* initialize the thread pool -- we'll use this to index tracks in parallel. */
for (int i = 0; i < threadCount; i++) {
threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &io));
threadGroup.create_thread([&io]() {
io.run();
});
}
this->Synchronize(context, &io);
@ -647,7 +647,8 @@ void Indexer::ThreadLoop() {
io.stop();
}
});
threadPool.join_all();
threadGroup.join_all();
}
else {
this->Synchronize(context, nullptr);
@ -800,7 +801,7 @@ void Indexer::SyncPlaylistTracksOrder() {
}
void Indexer::GetPaths(std::vector<std::string>& paths) {
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
std::copy(this->paths.begin(), this->paths.end(), std::back_inserter(paths));
}
@ -849,7 +850,7 @@ static int optimize(
++count;
}
boost::thread::yield();
std::this_thread::yield();
return count;
}
@ -863,8 +864,7 @@ void Indexer::SyncOptimize() {
}
void Indexer::ProcessAddRemoveQueue() {
boost::mutex::scoped_lock lock(this->stateMutex);
std::unique_lock<decltype(this->stateMutex)> lock(this->stateMutex);
while (!this->addRemoveQueue.empty()) {
AddRemoveContext context = this->addRemoveQueue.front();

View File

@ -41,18 +41,16 @@
#include <musikcore/sdk/IIndexerNotifier.h>
#include <musikcore/library/IIndexer.h>
#include <musikcore/support/Preferences.h>
#include <musikcore/support/ThreadGroup.h>
#pragma warning(push, 0)
#include <sigslot/sigslot.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/asio/io_service.hpp>
#pragma warning(pop)
#include <filesystem>
#include <thread>
#include <condition_variable>
#include <deque>
#include <vector>
#include <atomic>
@ -163,8 +161,8 @@ namespace musik { namespace core {
std::string libraryPath;
std::string dbFilename;
std::atomic<State> state;
boost::mutex stateMutex;
boost::condition waitCondition;
std::mutex stateMutex;
std::condition_variable_any waitCondition;
std::unique_ptr<std::thread> thread;
std::atomic<int> incrementalUrisScanned, totalUrisScanned;
std::deque<AddRemoveContext> addRemoveQueue;

View File

@ -729,6 +729,7 @@
<ClInclude Include="support\Playback.h" />
<ClInclude Include="support\PreferenceKeys.h" />
<ClInclude Include="support\Preferences.h" />
<ClInclude Include="support\ThreadGroup.h" />
<ClInclude Include="utfutil.h" />
<ClInclude Include="version.h" />
</ItemGroup>

View File

@ -666,5 +666,8 @@
<ClInclude Include="support\PiggyDebugBackend.h">
<Filter>src\support</Filter>
</ClInclude>
<ClInclude Include="support\ThreadGroup.h">
<Filter>src\support</Filter>
</ClInclude>
</ItemGroup>
</Project>

View File

@ -0,0 +1,151 @@
//////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2004-2021 musikcube team
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
/* copied from boost's ThreadGroup.hpp */
#pragma once
#include <list>
#include <thread>
#include <shared_mutex>
#include <mutex>
#include <memory>
namespace musik { namespace core {
class ThreadGroup
{
private:
ThreadGroup(ThreadGroup const&);
ThreadGroup& operator=(ThreadGroup const&);
public:
ThreadGroup() {}
~ThreadGroup()
{
for (std::list<std::thread*>::iterator it = threads.begin(), end = threads.end();
it != end;
++it)
{
delete* it;
}
}
bool is_this_thread_in()
{
std::thread::id id = std::this_thread::get_id();
std::shared_lock<std::shared_mutex> guard(m);
for (std::list<std::thread*>::iterator it = threads.begin(), end = threads.end();
it != end;
++it)
{
if ((*it)->get_id() == id)
return true;
}
return false;
}
bool is_thread_in(std::thread* thrd)
{
if (thrd)
{
std::thread::id id = thrd->get_id();
std::shared_lock<std::shared_mutex> guard(m);
for (std::list<std::thread*>::iterator it = threads.begin(), end = threads.end();
it != end;
++it)
{
if ((*it)->get_id() == id)
return true;
}
return false;
}
else
{
return false;
}
}
template<typename F>
std::thread* create_thread(F threadfunc)
{
std::lock_guard<std::shared_mutex> guard(m);
std::unique_ptr<std::thread> new_thread(new std::thread(threadfunc));
threads.push_back(new_thread.get());
return new_thread.release();
}
void add_thread(std::thread* thrd)
{
if (thrd)
{
std::lock_guard<std::shared_mutex> guard(m);
threads.push_back(thrd);
}
}
void remove_thread(std::thread* thrd)
{
std::lock_guard<std::shared_mutex> guard(m);
std::list<std::thread*>::iterator const it = std::find(threads.begin(), threads.end(), thrd);
if (it != threads.end())
{
threads.erase(it);
}
}
void join_all()
{
std::shared_lock<std::shared_mutex> guard(m);
for (std::list<std::thread*>::iterator it = threads.begin(), end = threads.end();
it != end;
++it)
{
if ((*it)->joinable())
(*it)->join();
}
}
size_t size() const
{
std::shared_lock<std::shared_mutex> guard(m);
return threads.size();
}
private:
std::list<std::thread*> threads;
mutable std::shared_mutex m;
};
} }