Added a correct implementation of multi-threaded indexing using a thread

pool and a semaphore.

Fixed a couple more hotkey issues.
This commit is contained in:
casey 2016-07-08 16:40:42 -07:00
parent 92d0a3f7d7
commit 65203c8fbc
6 changed files with 124 additions and 140 deletions

View File

@ -1,6 +1,6 @@
# musikcube
a cross-platform audio engine written in C++.
a cross-platform audio engine and metadata indexer written in C++.
# musikbox

View File

@ -51,7 +51,7 @@
#include <boost/thread/xtime.hpp>
#include <boost/bind.hpp>
#define MULTI_THREADED_INDEXER 0
#define MULTI_THREADED_INDEXER 1
static const std::string TAG = "Indexer";
static const int MAX_THREADS = 10;
@ -78,17 +78,16 @@ static std::string normalizePath(const std::string& path) {
}
Indexer::Indexer(const std::string& libraryPath, const std::string& dbFilename)
: thread(NULL)
: thread(nullptr)
, status(0)
, restart(false)
, filesIndexed(0)
, filesSaved(0)
, maxReadThreads(MAX_THREADS) {
, prefs(Preferences::ForComponent(INDEXER_PREFS_COMPONENT))
, readSemaphore(prefs->GetInt(INDEXER_PREFS_MAX_TAG_READ_THREADS, MAX_THREADS)) {
this->dbFilename = dbFilename;
this->libraryPath = libraryPath;
this->prefs = Preferences::ForComponent(INDEXER_PREFS_COMPONENT);
this->thread = new boost::thread(boost::bind(&Indexer::ThreadLoop, this));
this->maxReadThreads = this->prefs->GetInt(INDEXER_PREFS_MAX_TAG_READ_THREADS, MAX_THREADS);
}
Indexer::~Indexer() {
@ -96,7 +95,7 @@ Indexer::~Indexer() {
this->Exit();
this->thread->join();
delete this->thread;
this->thread = NULL;
this->thread = nullptr;
}
}
@ -137,7 +136,7 @@ void Indexer::RemovePath(const std::string& path) {
this->Synchronize(true);
}
void Indexer::SynchronizeInternal() {
void Indexer::SynchronizeInternal(boost::asio::io_service* io) {
/* load all of the metadata (tag) reader plugins */
typedef PluginFactory::DestroyDeleter<IMetadataReader> MetadataDeleter;
typedef PluginFactory::DestroyDeleter<IDecoderFactory> DecoderDeleter;
@ -190,7 +189,7 @@ void Indexer::SynchronizeInternal() {
for(std::size_t i = 0; i < paths.size(); ++i) {
this->trackTransaction.reset(new db::ScopedTransaction(this->dbConnection));
std::string path = paths[i];
this->SyncDirectory(path, path, pathIds[i]);
this->SyncDirectory(io, path, path, pathIds[i]);
}
if (this->trackTransaction) {
@ -257,73 +256,52 @@ void Indexer::ReadMetadataFromFile(
musik::core::IndexerTrack track(0);
/* get cached filesize, parts, size, etc */
if (!track.NeedsToBeIndexed(file, this->dbConnection)) {
return;
}
if (track.NeedsToBeIndexed(file, this->dbConnection)) {
bool saveToDb = false;
bool saveToDb = false;
/* read the tag from the plugin */
typedef MetadataReaderList::iterator Iterator;
Iterator it = this->metadataReaders.begin();
while (it != this->metadataReaders.end()) {
if ((*it)->CanRead(track.GetValue("extension").c_str())) {
if ((*it)->Read(file.string().c_str(), &track)) {
saveToDb = true;
break;
}
}
it++;
}
/* read the tag from the plugin */
typedef MetadataReaderList::iterator Iterator;
Iterator it = this->metadataReaders.begin();
while (it != this->metadataReaders.end()) {
if ((*it)->CanRead(track.GetValue("extension").c_str())) {
if ((*it)->Read(file.string().c_str(), &track)) {
saveToDb = true;
break;
/* no tag? well... if a decoder can play it, add it to the database
with the file as the name. */
if (!saveToDb) {
std::string fullPath = file.string();
auto it = this->audioDecoders.begin();
while (it != this->audioDecoders.end()) {
if ((*it)->CanHandle(fullPath.c_str())) {
saveToDb = true;
track.SetValue("title", file.leaf().string().c_str());
break;
}
++it;
}
}
it++;
}
/* no tag? well... if a decoder can play it, add it to the database
with the file as the name. */
if (!saveToDb) {
std::string fullPath = file.string();
auto it = this->audioDecoders.begin();
while (it != this->audioDecoders.end()) {
if ((*it)->CanHandle(fullPath.c_str())) {
saveToDb = true;
track.SetValue("title", file.leaf().string().c_str());
break;
}
++it;
/* write it to the db, if read successfully */
if (saveToDb) {
track.SetValue("path_id", pathId.c_str());
track.Save(this->dbConnection, this->libraryPath);
this->filesSaved++;
}
}
/* write it to the db, if read successfully */
if (saveToDb) {
track.SetValue("path_id", pathId.c_str());
track.Save(this->dbConnection, this->libraryPath);
this->filesSaved++;
}
}
#if MULTI_THREADED_INDEXER
static inline void joinAndNotify(
std::vector<Thread>& threads,
std::shared_ptr<musik::core::db::ScopedTransaction> transaction,
sigslot::signal0<>& event,
std::atomic<size_t>& saved)
{
for (size_t i = 0; i < threads.size(); i++) {
threads.at(i)->join();
}
threads.clear();
if (saved.load() > 200) {
if (transaction) {
transaction->CommitAndRestart();
}
event();
saved.store(0);
}
}
#ifdef MULTI_THREADED_INDEXER
this->readSemaphore.post();
#endif
}
void Indexer::SyncDirectory(
boost::asio::io_service* io,
const std::string &syncRoot,
const std::string &currentPath,
DBID pathId)
@ -349,60 +327,37 @@ void Indexer::SyncDirectory(
std::string pathIdStr = boost::lexical_cast<std::string>(pathId);
std::vector<Thread> threads;
#if MULTI_THREADED_INDEXER
#define WAIT_FOR_ACTIVE() \
joinAndNotify( \
threads, \
this->trackTransaction, \
this->TrackRefreshed, \
this->filesSaved);
#endif
for( ; file != end && !this->Exited() && !this->Restarted(); file++) {
#if MULTI_THREADED_INDEXER
/* we do things in batches of 5. wait for this batch to
finish, then we'll spin up some more... */
if (threads.size() >= this->maxReadThreads) {
WAIT_FOR_ACTIVE();
if (this->filesSaved.load() > 200) {
if (this->trackTransaction) {
this->trackTransaction->CommitAndRestart();
}
this->TrackRefreshed();
this->filesSaved.store(0);
}
#endif
if (is_directory(file->status())) {
/* recursion here */
musik::debug::info(TAG, "scanning " + file->path().string());
#if MULTI_THREADED_INDEXER
WAIT_FOR_ACTIVE();
#endif
this->SyncDirectory(syncRoot, file->path().string(), pathId);
this->SyncDirectory(io, syncRoot, file->path().string(), pathId);
}
else {
++this->filesIndexed;
#if MULTI_THREADED_INDEXER
threads.push_back(Thread(new boost::thread(
boost::bind(
&Indexer::ReadMetadataFromFile,
this,
file->path(),
pathIdStr))));
#else
this->ReadMetadataFromFile(file->path(), pathIdStr);
if (io) {
this->readSemaphore.wait();
if (this->filesSaved.load() > 200) {
if (this->trackTransaction) {
this->trackTransaction->CommitAndRestart();
}
this->TrackRefreshed();
this->filesSaved.store(0);
io->post(boost::bind(
&Indexer::ReadMetadataFromFile,
this,
file->path(),
pathIdStr));
}
else {
this->ReadMetadataFromFile(file->path(), pathIdStr);
}
#endif
}
}
#if MULTI_THREADED_INDEXER
/* there may be a few left... */
WAIT_FOR_ACTIVE();
#endif
}
catch(...) {
}
@ -427,7 +382,27 @@ void Indexer::ThreadLoop() {
this->dbConnection.Open(this->dbFilename.c_str(), 0); /* ensure the db is open */
this->SynchronizeInternal();
#ifdef MULTI_THREADED_INDEXER
boost::asio::io_service io;
boost::thread_group threadPool;
boost::asio::io_service::work work(io);
/* initialize the thread pool -- we'll use this to index tracks in parallel. */
int threadCount = prefs->GetInt(INDEXER_PREFS_MAX_TAG_READ_THREADS, MAX_THREADS);
for (int i = 0; i < threadCount; i++) {
threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &io));
}
this->SynchronizeInternal(&io);
/* done with sync, remove all the threads in the pool to free resources. they'll
be re-created later if we index again. */
io.stop();
threadPool.join_all();
#else
this->SynchronizeInternal(nullptr);
#endif
this->RunAnalyzers();
{
@ -442,9 +417,10 @@ void Indexer::ThreadLoop() {
firstTime = false;
int waitTime = prefs->GetInt(INDEXER_PREFS_SYNC_TIMEOUT, 3600); /* sleep before we try again... */
/* sleep before we try again; disabled by default */
int waitTime = prefs->GetInt(INDEXER_PREFS_AUTO_SYNC_MILLIS, 0);
if (waitTime) {
if (waitTime > 0) {
boost::xtime waitTimeout;
boost::xtime_get(&waitTimeout, boost::TIME_UTC_);
waitTimeout.sec += waitTime;

View File

@ -46,6 +46,7 @@
#include <boost/thread/thread.hpp>
#include <boost/filesystem.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <deque>
#include <vector>
@ -53,7 +54,7 @@
#define INDEXER_PREFS_COMPONENT "indexer"
#define INDEXER_PREFS_SYNC_ON_STARTUP "SyncOnStartup"
#define INDEXER_PREFS_SYNC_TIMEOUT "SyncTimeout"
#define INDEXER_PREFS_AUTO_SYNC_MILLIS "AutoSyncIntervalMillis"
#define INDEXER_PREFS_REMOVE_MISSING_FILES "RemoveMissingFiles"
#define INDEXER_PREFS_MAX_TAG_READ_THREADS "MaxTagReadThreads"
@ -83,9 +84,10 @@ namespace musik { namespace core {
void SyncOptimize();
void RunAnalyzers();
void SynchronizeInternal();
void SynchronizeInternal(boost::asio::io_service* io);
void SyncDirectory(
boost::asio::io_service* io,
const std::string& syncRoot,
const std::string& currentPath,
DBID pathId);
@ -125,7 +127,7 @@ namespace musik { namespace core {
DecoderList audioDecoders;
std::shared_ptr<musik::core::Preferences> prefs;
std::shared_ptr<musik::core::db::ScopedTransaction> trackTransaction;
size_t maxReadThreads;
boost::interprocess::interprocess_semaphore readSemaphore;
};
typedef std::shared_ptr<Indexer> IndexerPtr;

View File

@ -83,25 +83,28 @@ void ListWindow::ScrollToBottom() {
}
void ListWindow::ScrollUp(int delta) {
ScrollPos spos = this->GetScrollPosition();
IScrollAdapter& adapter = this->GetScrollAdapter();
size_t first = spos.firstVisibleEntryIndex;
size_t last = first + spos.visibleEntryCount;
int drawIndex = first;
if (adapter.GetEntryCount() > 0) {
ScrollPos spos = this->GetScrollPosition();
int minIndex = 0;
int newIndex = this->selectedIndex - delta;
newIndex = std::max(newIndex, minIndex);
size_t first = spos.firstVisibleEntryIndex;
size_t last = first + spos.visibleEntryCount;
int drawIndex = first;
if (newIndex < (int) first + 1) {
drawIndex = newIndex - 1;
int minIndex = 0;
int newIndex = this->selectedIndex - delta;
newIndex = std::max(newIndex, minIndex);
if (newIndex < (int)first + 1) {
drawIndex = newIndex - 1;
}
drawIndex = std::max(0, drawIndex);
this->SetSelectedIndex(newIndex);
this->ScrollTo(drawIndex);
}
drawIndex = std::max(0, drawIndex);
this->SetSelectedIndex(newIndex);
this->ScrollTo(drawIndex);
}
void ListWindow::OnInvalidated() {
@ -109,23 +112,26 @@ void ListWindow::OnInvalidated() {
}
void ListWindow::ScrollDown(int delta) {
ScrollPos spos = this->GetScrollPosition();
IScrollAdapter& adapter = this->GetScrollAdapter();
size_t first = spos.firstVisibleEntryIndex;
size_t last = first + spos.visibleEntryCount;
size_t drawIndex = first;
if (adapter.GetEntryCount() > 0) {
ScrollPos spos = this->GetScrollPosition();
size_t maxIndex = adapter.GetEntryCount() - 1;
size_t newIndex = this->selectedIndex + delta;
newIndex = std::min(newIndex, maxIndex);
size_t first = spos.firstVisibleEntryIndex;
size_t last = first + spos.visibleEntryCount;
size_t drawIndex = first;
if (newIndex >= last - 1) {
drawIndex = drawIndex + delta;
size_t maxIndex = adapter.GetEntryCount() - 1;
size_t newIndex = this->selectedIndex + delta;
newIndex = std::min(newIndex, maxIndex);
if (newIndex >= last - 1) {
drawIndex = drawIndex + delta;
}
this->SetSelectedIndex(newIndex);
this->ScrollTo(drawIndex);
}
this->SetSelectedIndex(newIndex);
this->ScrollTo(drawIndex);
}
void ListWindow::PageUp() {

View File

@ -43,6 +43,8 @@
using namespace cursespp;
static const size_t INVALID_INDEX = (size_t) -1;
typedef IScrollAdapter::ScrollPosition ScrollPos;
#define REDRAW_VISIBLE_PAGE() \

View File

@ -395,8 +395,6 @@ void Window::Hide() {
}
void Window::Destroy() {
this->Hide();
if (this->frame) {
del_panel(this->framePanel);
delwin(this->frame);