- Removed CachedStatement -- it's a good idea, but implementation is a

bit faulty.
- Added track indexing parallelization to Indexer -- read up to 8 files
  at a time (value to be tweaked). Results in much faster indexing for
  large collections.
This commit is contained in:
casey 2016-07-04 15:23:13 -07:00
parent c44180db63
commit 5916bc11b2
12 changed files with 106 additions and 344 deletions

View File

@ -4,7 +4,6 @@ set(CORE_SOURCES
./audio/Player.cpp ./audio/Player.cpp
./audio/Stream.cpp ./audio/Stream.cpp
./audio/GaplessTransport.cpp ./audio/GaplessTransport.cpp
./db/CachedStatement.cpp
./db/Connection.cpp ./db/Connection.cpp
./db/ScopedTransaction.cpp ./db/ScopedTransaction.cpp
./db/Statement.cpp ./db/Statement.cpp

View File

@ -100,7 +100,6 @@
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader> <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Create</PrecompiledHeader> <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Create</PrecompiledHeader>
</ClCompile> </ClCompile>
<ClCompile Include="db\CachedStatement.cpp" />
<ClCompile Include="db\Connection.cpp" /> <ClCompile Include="db\Connection.cpp" />
<ClCompile Include="db\ScopedTransaction.cpp" /> <ClCompile Include="db\ScopedTransaction.cpp" />
<ClCompile Include="db\Statement.cpp" /> <ClCompile Include="db\Statement.cpp" />
@ -148,7 +147,6 @@
<ClInclude Include="sdk\IBufferProvider.h" /> <ClInclude Include="sdk\IBufferProvider.h" />
<ClInclude Include="sdk\IPlugin.h" /> <ClInclude Include="sdk\IPlugin.h" />
<ClInclude Include="sdk\IMetadataWriter.h" /> <ClInclude Include="sdk\IMetadataWriter.h" />
<ClInclude Include="db\CachedStatement.h" />
<ClInclude Include="db\Connection.h" /> <ClInclude Include="db\Connection.h" />
<ClInclude Include="db\dbconfig.h" /> <ClInclude Include="db\dbconfig.h" />
<ClInclude Include="db\ScopedTransaction.h" /> <ClInclude Include="db\ScopedTransaction.h" />

View File

@ -40,9 +40,6 @@
<ClCompile Include="pch.cpp"> <ClCompile Include="pch.cpp">
<Filter>src</Filter> <Filter>src</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="db\CachedStatement.cpp">
<Filter>src\db</Filter>
</ClCompile>
<ClCompile Include="db\Connection.cpp"> <ClCompile Include="db\Connection.cpp">
<Filter>src\db</Filter> <Filter>src\db</Filter>
</ClCompile> </ClCompile>
@ -120,9 +117,6 @@
<ClInclude Include="pch.hpp"> <ClInclude Include="pch.hpp">
<Filter>src</Filter> <Filter>src</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="db\CachedStatement.h">
<Filter>src\db</Filter>
</ClInclude>
<ClInclude Include="db\Connection.h"> <ClInclude Include="db\Connection.h">
<Filter>src\db</Filter> <Filter>src\db</Filter>
</ClInclude> </ClInclude>

View File

@ -1,74 +0,0 @@
//////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2007-2016 musikcube team
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#include "pch.hpp"
#include <core/db/CachedStatement.h>
#include <core/db/Statement.h>
#include <core/db/Connection.h>
#include <sqlite/sqlite3.h>
using namespace musik::core::db;
//////////////////////////////////////////
///\brief
///Constructor
///
///\param sql
///SQL
///
///\param connection
///Connection to execute the statement on
//////////////////////////////////////////
CachedStatement::CachedStatement(const char* sql,Connection &connection) : Statement(connection){
this->sqlStatement.assign(sql);
this->stmt = this->connection->GetCachedStatement(sql);
}
//////////////////////////////////////////
///\brief
///Destructor
///
///Will return the cached statement to the Connection
///
///\see
///musik::core::db::Connection::ReturnCachedStatement
//////////////////////////////////////////
CachedStatement::~CachedStatement(){
sqlite3_reset(this->stmt);
sqlite3_clear_bindings(this->stmt);
this->connection->ReturnCachedStatement(this->sqlStatement.c_str(),this->stmt);
this->stmt=NULL;
}

View File

@ -1,71 +0,0 @@
//////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2007-2016 musikcube team
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the author nor the names of other contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////////
#pragma once
#include <core/config.h>
#include <core/db/Statement.h>
namespace musik{ namespace core{ namespace db{
// Forward declare
class Statement;
//////////////////////////////////////////
///\brief
///Same as Statement, but keeps the statement in a cache when destructed
///
///Be careful using this class only on "static" SQL statement
///This means that you should not use this on statements looking like
///this: CacheStatement("SELECT * FROM mytable WHERE id="+id,db);
///since the cache will grow for each id in this example
///
///Instead, do like this:
///CacheStatement("SELECT * FROM mytable WHERE id=?",db);
///And use the BindInt to set the id
///
///\see
///musik::core::db::Statement
//////////////////////////////////////////
class CachedStatement : public Statement{
public:
CachedStatement(const char* sql,Connection &connection);
~CachedStatement();
private:
std::string sqlStatement;
};
} } }

View File

@ -43,27 +43,17 @@ using namespace musik::core::db;
boost::mutex Connection::globalMutex; boost::mutex Connection::globalMutex;
////////////////////////////////////////// Connection::Connection()
///\brief : connection(nullptr)
///Constructor , transactionCounter(0) {
//////////////////////////////////////////
Connection::Connection() : connection(NULL),transactionCounter(0) {
this->Maintenance(true); this->Maintenance(true);
} }
//////////////////////////////////////////
///\brief
///Destructor
///
///Will automatically close the connection if it's not closed before
//////////////////////////////////////////
Connection::~Connection(){ Connection::~Connection(){
this->Close(); this->Close();
this->Maintenance(false); this->Maintenance(false);
} }
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Open a connection to the database ///Open a connection to the database
@ -80,19 +70,19 @@ Connection::~Connection(){
///\returns ///\returns
///Error code returned by SQLite ///Error code returned by SQLite
////////////////////////////////////////// //////////////////////////////////////////
int Connection::Open(const char *database,unsigned int options,unsigned int cache){ int Connection::Open(const char *database, unsigned int options, unsigned int cache) {
// sqlite3_enable_shared_cache(1);
int error; int error;
#ifdef UTF_WIDECHAR
error = sqlite3_open16(database,&this->connection);
#else
error = sqlite3_open(database,&this->connection);
#endif
if(error==SQLITE_OK){ #ifdef UTF_WIDECHAR
error = sqlite3_open16(database,&this->connection);
#else
error = sqlite3_open(database,&this->connection);
#endif
if (error==SQLITE_OK) {
this->Initialize(cache); this->Initialize(cache);
} }
return error; return error;
} }
@ -116,19 +106,18 @@ int Connection::Open(const std::string &database,unsigned int options,unsigned i
int error; int error;
#ifdef WIN32 #ifdef WIN32
std::wstring wdatabase = u8to16(database); std::wstring wdatabase = u8to16(database);
error = sqlite3_open16(wdatabase.c_str(),&this->connection); error = sqlite3_open16(wdatabase.c_str(),&this->connection);
#else #else
error = sqlite3_open(database.c_str(),&this->connection); error = sqlite3_open(database.c_str(),&this->connection);
#endif #endif
if(error==SQLITE_OK){ if (error==SQLITE_OK) {
this->Initialize(cache); this->Initialize(cache);
} }
return error; return error;
} }
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Close connection to the database ///Close connection to the database
@ -136,23 +125,15 @@ int Connection::Open(const std::string &database,unsigned int options,unsigned i
///\returns ///\returns
///Errorcode ( musik::core::db::ReturnCode ) ///Errorcode ( musik::core::db::ReturnCode )
////////////////////////////////////////// //////////////////////////////////////////
int Connection::Close(){ int Connection::Close() {
if (sqlite3_close(this->connection) == SQLITE_OK) {
// Clear the cache this->connection = 0;
for(StatementCache::iterator stmt=this->cachedStatements.begin();stmt!=this->cachedStatements.end();++stmt){
sqlite3_finalize(stmt->second);
}
this->cachedStatements.clear();
if(sqlite3_close(this->connection)==SQLITE_OK){
this->connection = 0;
return musik::core::db::Okay; return musik::core::db::Okay;
} }
return musik::core::db::Error; return musik::core::db::Error;
} }
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Execute a SQL string ///Execute a SQL string
@ -166,7 +147,7 @@ int Connection::Close(){
///\see ///\see
///musik::core::db::ReturnCode ///musik::core::db::ReturnCode
////////////////////////////////////////// //////////////////////////////////////////
int Connection::Execute(const char* sql){ int Connection::Execute(const char* sql) {
sqlite3_stmt *stmt = NULL; sqlite3_stmt *stmt = NULL;
// Prepaire seems to give errors when interrupted // Prepaire seems to give errors when interrupted
@ -187,6 +168,7 @@ int Connection::Execute(const char* sql){
sqlite3_reset(stmt); sqlite3_reset(stmt);
sqlite3_finalize(stmt); sqlite3_finalize(stmt);
return musik::core::db::Okay; return musik::core::db::Okay;
} }
@ -204,7 +186,7 @@ int Connection::Execute(const char* sql){
///\see ///\see
///musik::core::db::ReturnCode ///musik::core::db::ReturnCode
////////////////////////////////////////// //////////////////////////////////////////
int Connection::Execute(const wchar_t* sql){ int Connection::Execute(const wchar_t* sql) {
sqlite3_stmt *stmt = NULL; sqlite3_stmt *stmt = NULL;
{ {
boost::mutex::scoped_lock lock(this->mutex); boost::mutex::scoped_lock lock(this->mutex);
@ -216,7 +198,7 @@ int Connection::Execute(const wchar_t* sql){
} }
// Execute the statement // Execute the statement
int error = this->StepStatement(stmt); int error = this->StepStatement(stmt);
if(error!=SQLITE_OK && error!=SQLITE_DONE){ if(error!=SQLITE_OK && error!=SQLITE_DONE){
sqlite3_finalize(stmt); sqlite3_finalize(stmt);
return db::Error; return db::Error;
@ -232,7 +214,6 @@ void Connection::Analyze(){
// this->Execute("ANALYZE"); // this->Execute("ANALYZE");
} }
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Get the last inserted row ID ///Get the last inserted row ID
@ -244,10 +225,9 @@ void Connection::Analyze(){
///http://www.sqlite.org/c3ref/last_insert_rowid.html ///http://www.sqlite.org/c3ref/last_insert_rowid.html
////////////////////////////////////////// //////////////////////////////////////////
int Connection::LastInsertedId(){ int Connection::LastInsertedId(){
return (int)sqlite3_last_insert_rowid(this->connection); return (int) sqlite3_last_insert_rowid(this->connection);
} }
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Initializes the database. ///Initializes the database.
@ -282,65 +262,6 @@ void Connection::Initialize(unsigned int cache){
} }
//////////////////////////////////////////
///\brief
///Internal method used by the CachedStatement to locate if a statement already exists
///
///\param sql
///SQL to check for
///
///\returns
///The cached or newly created statement
///
///\see
///musik::core::db::CachedStatment
//////////////////////////////////////////
sqlite3_stmt *Connection::GetCachedStatement(const char* sql){
sqlite3_stmt *newStmt(NULL);
StatementCache::iterator stmt = this->cachedStatements.find(sql);
if(stmt==this->cachedStatements.end()){
boost::mutex::scoped_lock lock(this->mutex);
int err = sqlite3_prepare_v2(this->connection,sql,-1,&newStmt,NULL);
if(err!=SQLITE_OK){
return NULL;
}
return newStmt;
}
newStmt = stmt->second;
this->cachedStatements.erase(stmt);
return newStmt;
}
//////////////////////////////////////////
///\brief
///Used by CachedStatement when destructed to return it's statement.
///
///\param sql
///SQL string
///
///\param stmt
///Statement to return
///
///\see
///musik::core::db::CachedStatment
//////////////////////////////////////////
void Connection::ReturnCachedStatement(const char* sql,sqlite3_stmt *stmt){
StatementCache::iterator cacheStmt = this->cachedStatements.find(sql);
if(cacheStmt==this->cachedStatements.end()){
// Insert the stmt in cache
this->cachedStatements[sql] = stmt;
}else{
// Stmt already exists. Finalize it
DB_ASSERT(sqlite3_finalize(stmt));
}
}
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
///Interrupts the current running statement(s) ///Interrupts the current running statement(s)

View File

@ -76,20 +76,12 @@ namespace musik{ namespace core{ namespace db{
void Initialize(unsigned int cache); void Initialize(unsigned int cache);
typedef std::map<std::string,sqlite3_stmt*> StatementCache;
StatementCache cachedStatements;
friend class Statement; friend class Statement;
friend class CachedStatement;
friend class ScopedTransaction; friend class ScopedTransaction;
sqlite3_stmt *GetCachedStatement(const char* sql);
void ReturnCachedStatement(const char* sql,sqlite3_stmt *stmt);
int StepStatement(sqlite3_stmt *stmt); int StepStatement(sqlite3_stmt *stmt);
int transactionCounter; int transactionCounter;
sqlite3 *connection; sqlite3 *connection;
boost::mutex mutex; boost::mutex mutex;

View File

@ -48,8 +48,6 @@ struct sqlite3_stmt;
namespace musik{ namespace core{ namespace db{ namespace musik{ namespace core{ namespace db{
class Connection; class Connection;
class CachedStatement;
////////////////////////////////////////// //////////////////////////////////////////
///\brief ///\brief
@ -87,9 +85,5 @@ namespace musik{ namespace core{ namespace db{
Statement(Connection &connection); Statement(Connection &connection);
}; };
} } } } } }
#include <core/db/CachedStatement.h>

View File

@ -52,11 +52,14 @@
#include <boost/bind.hpp> #include <boost/bind.hpp>
static const std::string TAG = "Indexer"; static const std::string TAG = "Indexer";
static const int MAX_THREADS = 8;
using namespace musik::core; using namespace musik::core;
using namespace musik::core::metadata; using namespace musik::core::metadata;
using namespace musik::core::audio; using namespace musik::core::audio;
using Thread = std::unique_ptr<boost::thread>;
static std::string normalizeDir(std::string path) { static std::string normalizeDir(std::string path) {
path = boost::filesystem::path(path).make_preferred().string(); path = boost::filesystem::path(path).make_preferred().string();
@ -84,12 +87,6 @@ Indexer::Indexer(const std::string& libraryPath, const std::string& dbFilename)
this->thread = new boost::thread(boost::bind(&Indexer::ThreadLoop, this)); this->thread = new boost::thread(boost::bind(&Indexer::ThreadLoop, this));
} }
//////////////////////////////////////////
///\brief
///Destructor
///
///Exits and joins threads
//////////////////////////////////////////
Indexer::~Indexer() { Indexer::~Indexer() {
if (this->thread) { if (this->thread) {
this->Exit(); this->Exit();
@ -99,38 +96,17 @@ Indexer::~Indexer() {
} }
} }
//////////////////////////////////////////
///\brief
///Restart the sync
///
///\param bNewRestart
///Should if be restarted or not
//////////////////////////////////////////
void Indexer::Synchronize(bool restart) { void Indexer::Synchronize(bool restart) {
boost::mutex::scoped_lock lock(this->exitMutex); boost::mutex::scoped_lock lock(this->exitMutex);
this->restart = restart; this->restart = restart;
this->Notify(); this->Notify();
} }
//////////////////////////////////////////
///\brief
///Should the sync be restarted?
//////////////////////////////////////////
bool Indexer::Restarted() { bool Indexer::Restarted() {
boost::mutex::scoped_lock lock(this->exitMutex); boost::mutex::scoped_lock lock(this->exitMutex);
return this->restart; return this->restart;
} }
//////////////////////////////////////////
///\brief
///Add a new path to the Indexer.
///
///\param sPath
///Path to add
///
///\remarks
///If the path already exists it will not be added.
//////////////////////////////////////////
void Indexer::AddPath(const std::string& path) { void Indexer::AddPath(const std::string& path) {
Indexer::AddRemoveContext context; Indexer::AddRemoveContext context;
context.add = true; context.add = true;
@ -144,13 +120,6 @@ void Indexer::AddPath(const std::string& path) {
this->Synchronize(true); this->Synchronize(true);
} }
//////////////////////////////////////////
///\brief
///Remove a path from the Indexer
///
///\param sPath
///Path to remove
//////////////////////////////////////////
void Indexer::RemovePath(const std::string& path) { void Indexer::RemovePath(const std::string& path) {
Indexer::AddRemoveContext context; Indexer::AddRemoveContext context;
context.add = false; context.add = false;
@ -164,10 +133,6 @@ void Indexer::RemovePath(const std::string& path) {
this->Synchronize(true); this->Synchronize(true);
} }
//////////////////////////////////////////
///\brief
///Main method for doing the synchronization.
//////////////////////////////////////////
void Indexer::SynchronizeInternal() { void Indexer::SynchronizeInternal() {
/* load all of the metadata (tag) reader plugins */ /* load all of the metadata (tag) reader plugins */
typedef PluginFactory::DestroyDeleter<IMetadataReader> MetadataDeleter; typedef PluginFactory::DestroyDeleter<IMetadataReader> MetadataDeleter;
@ -252,7 +217,7 @@ void Indexer::SynchronizeInternal() {
} }
if(!this->Restarted() && !this->Exited()){ if (!this->Restarted() && !this->Exited()){
this->SyncCleanup(); this->SyncCleanup();
} }
@ -265,7 +230,7 @@ void Indexer::SynchronizeInternal() {
this->status = 5; this->status = 5;
} }
if(!this->Restarted() && !this->Exited()){ if (!this->Restarted() && !this->Exited()){
this->SyncOptimize(); this->SyncOptimize();
} }
@ -282,13 +247,13 @@ void Indexer::SynchronizeInternal() {
} }
void Indexer::ReadMetadataFromFile( void Indexer::ReadMetadataFromFile(
const boost::filesystem::directory_iterator file, const boost::filesystem::path& file,
const std::string& pathId) const std::string& pathId)
{ {
musik::core::IndexerTrack track(0); musik::core::IndexerTrack track(0);
/* get cached filesize, parts, size, etc */ /* get cached filesize, parts, size, etc */
if (!track.NeedsToBeIndexed(file->path(), this->dbConnection)) { if (!track.NeedsToBeIndexed(file, this->dbConnection)) {
return; return;
} }
@ -299,7 +264,7 @@ void Indexer::ReadMetadataFromFile(
Iterator it = this->metadataReaders.begin(); Iterator it = this->metadataReaders.begin();
while (it != this->metadataReaders.end()) { while (it != this->metadataReaders.end()) {
if ((*it)->CanRead(track.GetValue("extension").c_str())) { if ((*it)->CanRead(track.GetValue("extension").c_str())) {
if ((*it)->Read(file->path().string().c_str(), &track)) { if ((*it)->Read(file.string().c_str(), &track)) {
saveToDb = true; saveToDb = true;
break; break;
} }
@ -310,12 +275,12 @@ void Indexer::ReadMetadataFromFile(
/* no tag? well... if a decoder can play it, add it to the database /* no tag? well... if a decoder can play it, add it to the database
with the file as the name. */ with the file as the name. */
if (!saveToDb) { if (!saveToDb) {
std::string fullPath = file->path().string(); std::string fullPath = file.string();
auto it = this->audioDecoders.begin(); auto it = this->audioDecoders.begin();
while (it != this->audioDecoders.end()) { while (it != this->audioDecoders.end()) {
if ((*it)->CanHandle(fullPath.c_str())) { if ((*it)->CanHandle(fullPath.c_str())) {
saveToDb = true; saveToDb = true;
track.SetValue("title", file->path().leaf().string().c_str()); track.SetValue("title", file.leaf().string().c_str());
break; break;
} }
++it; ++it;
@ -326,15 +291,30 @@ void Indexer::ReadMetadataFromFile(
if (saveToDb) { if (saveToDb) {
track.SetValue("path_id", pathId.c_str()); track.SetValue("path_id", pathId.c_str());
track.Save(this->dbConnection, this->libraryPath); track.Save(this->dbConnection, this->libraryPath);
}
}
this->filesSaved++; static inline void joinAndNotify(
if (this->filesSaved % 200 == 0) { std::vector<Thread>& threads,
if (this->trackTransaction) { std::shared_ptr<musik::core::db::ScopedTransaction> transaction,
this->trackTransaction->CommitAndRestart(); sigslot::signal0<>& event,
} size_t& total)
{
total += threads.size();
this->TrackRefreshed(); for (size_t i = 0; i < threads.size(); i++) {
threads.at(i)->join();
}
threads.clear();
if (total > 200) {
if (transaction) {
transaction->CommitAndRestart();
} }
event();
total = 0;
} }
} }
@ -362,21 +342,47 @@ void Indexer::SyncDirectory(
boost::filesystem::directory_iterator file(path); boost::filesystem::directory_iterator file(path);
std::string pathIdStr = boost::lexical_cast<std::string>(pathId); std::string pathIdStr = boost::lexical_cast<std::string>(pathId);
std::vector<Thread> threads;
#define WAIT_FOR_ACTIVE() \
joinAndNotify( \
threads, \
this->trackTransaction, \
this->TrackRefreshed, \
this->filesSaved);
for( ; file != end && !this->Exited() && !this->Restarted(); file++) { for( ; file != end && !this->Exited() && !this->Restarted(); file++) {
/* we do things in batches of 5. wait for this batch to
finish, then we'll spin up some more... */
if (threads.size() >= MAX_THREADS) {
WAIT_FOR_ACTIVE();
}
if (is_directory(file->status())) { if (is_directory(file->status())) {
/* recursion here */ /* recursion here */
musik::debug::info(TAG, "scanning " + file->path().string()); musik::debug::info(TAG, "scanning " + file->path().string());
WAIT_FOR_ACTIVE();
this->SyncDirectory(syncRoot, file->path().string(), pathId); this->SyncDirectory(syncRoot, file->path().string(), pathId);
} }
else { else {
++this->filesIndexed; ++this->filesIndexed;
this->ReadMetadataFromFile(file, pathIdStr);
threads.push_back(Thread(new boost::thread(
boost::bind(
&Indexer::ReadMetadataFromFile,
this,
file->path(),
pathIdStr))));
} }
} }
/* there may be a few left... */
WAIT_FOR_ACTIVE();
} }
catch(...) { catch(...) {
} }
#undef WAIT_FOR_ACTIVE()
} }
void Indexer::ThreadLoop() { void Indexer::ThreadLoop() {

View File

@ -45,6 +45,7 @@
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/asio/io_service.hpp>
#include <deque> #include <deque>
#include <vector> #include <vector>
@ -88,7 +89,7 @@ namespace musik { namespace core {
DBID pathId); DBID pathId);
void ReadMetadataFromFile( void ReadMetadataFromFile(
const boost::filesystem::directory_iterator path, const boost::filesystem::path& path,
const std::string& pathId); const std::string& pathId);
db::Connection dbConnection; db::Connection dbConnection;
@ -102,8 +103,8 @@ namespace musik { namespace core {
boost::thread *thread; boost::thread *thread;
boost::mutex progressMutex; boost::mutex progressMutex;
int filesIndexed; size_t filesIndexed;
int filesSaved; size_t filesSaved;
struct AddRemoveContext { struct AddRemoveContext {
bool add; bool add;

View File

@ -43,6 +43,7 @@
#include <core/io/DataStreamFactory.h> #include <core/io/DataStreamFactory.h>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
using namespace musik::core; using namespace musik::core;
@ -56,6 +57,8 @@ using namespace musik::core;
#define ARTIST_TRACK_JUNCTION_TABLE_NAME "track_artists" #define ARTIST_TRACK_JUNCTION_TABLE_NAME "track_artists"
#define ARTIST_TRACK_FOREIGN_KEY "artist_id" #define ARTIST_TRACK_FOREIGN_KEY "artist_id"
static boost::mutex trackWriteLock;
IndexerTrack::IndexerTrack(DBID id) IndexerTrack::IndexerTrack(DBID id)
: internalMetadata(new IndexerTrack::MetadataWithThumbnail()) : internalMetadata(new IndexerTrack::MetadataWithThumbnail())
, id(id) , id(id)
@ -147,7 +150,7 @@ bool IndexerTrack::NeedsToBeIndexed(
this->SetValue("filesize", boost::lexical_cast<std::string>(fileSize).c_str()); this->SetValue("filesize", boost::lexical_cast<std::string>(fileSize).c_str());
this->SetValue("filetime", boost::lexical_cast<std::string>(fileTime).c_str()); this->SetValue("filetime", boost::lexical_cast<std::string>(fileTime).c_str());
db::CachedStatement stmt( db::Statement stmt(
"SELECT id, filename, filesize, filetime " \ "SELECT id, filename, filesize, filetime " \
"FROM tracks t " \ "FROM tracks t " \
"WHERE filename=?", dbConnection); "WHERE filename=?", dbConnection);
@ -176,7 +179,7 @@ static DBID writeToTracksTable(
db::Connection &dbConnection, db::Connection &dbConnection,
IndexerTrack& track) IndexerTrack& track)
{ {
db::CachedStatement stmt("INSERT OR REPLACE INTO tracks " \ db::Statement stmt("INSERT OR REPLACE INTO tracks " \
"(id, track, disc, bpm, duration, filesize, year, title, filename, filetime, path_id) " \ "(id, track, disc, bpm, duration, filesize, year, title, filename, filetime, path_id) " \
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", dbConnection); "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", dbConnection);
@ -210,7 +213,7 @@ static void removeRelation(
DBID trackId) DBID trackId)
{ {
std::string query = boost::str(boost::format("DELETE FROM %1% WHERE track_id=?") % field); std::string query = boost::str(boost::format("DELETE FROM %1% WHERE track_id=?") % field);
db::CachedStatement stmt(query.c_str(), connection); db::Statement stmt(query.c_str(), connection);
stmt.BindInt(0, trackId); stmt.BindInt(0, trackId);
stmt.Step(); stmt.Step();
} }
@ -239,7 +242,7 @@ DBID IndexerTrack::SaveThumbnail(db::Connection& connection, const std::string&
if (this->internalMetadata->thumbnailData) { if (this->internalMetadata->thumbnailData) {
uint64 sum = Checksum(this->internalMetadata->thumbnailData, this->internalMetadata->thumbnailSize); uint64 sum = Checksum(this->internalMetadata->thumbnailData, this->internalMetadata->thumbnailSize);
db::CachedStatement thumbs("SELECT id FROM thumbnails WHERE filesize=? AND checksum=?", connection); db::Statement thumbs("SELECT id FROM thumbnails WHERE filesize=? AND checksum=?", connection);
thumbs.BindInt(0, this->internalMetadata->thumbnailSize); thumbs.BindInt(0, this->internalMetadata->thumbnailSize);
thumbs.BindInt(1, sum); thumbs.BindInt(1, sum);
@ -280,11 +283,11 @@ void IndexerTrack::ProcessNonStandardMetadata(db::Connection& connection) {
MetadataMap unknownFields(this->internalMetadata->metadata); MetadataMap unknownFields(this->internalMetadata->metadata);
removeKnownFields(unknownFields); removeKnownFields(unknownFields);
db::CachedStatement selectMetaKey("SELECT id FROM meta_keys WHERE name=?", connection); db::Statement selectMetaKey("SELECT id FROM meta_keys WHERE name=?", connection);
db::CachedStatement selectMetaValue("SELECT id FROM meta_values WHERE meta_key_id=? AND content=?", connection); db::Statement selectMetaValue("SELECT id FROM meta_values WHERE meta_key_id=? AND content=?", connection);
db::CachedStatement insertMetaValue("INSERT INTO meta_values (meta_key_id,content) VALUES (?,?)", connection); db::Statement insertMetaValue("INSERT INTO meta_values (meta_key_id,content) VALUES (?,?)", connection);
db::CachedStatement insertTrackMeta("INSERT INTO track_meta (track_id,meta_value_id) VALUES (?,?)", connection); db::Statement insertTrackMeta("INSERT INTO track_meta (track_id,meta_value_id) VALUES (?,?)", connection);
db::CachedStatement insertMetaKey("INSERT INTO meta_keys (name) VALUES (?)", connection); db::Statement insertMetaKey("INSERT INTO meta_keys (name) VALUES (?)", connection);
MetadataMap::const_iterator it = unknownFields.begin(); MetadataMap::const_iterator it = unknownFields.begin();
for ( ; it != unknownFields.end(); ++it){ for ( ; it != unknownFields.end(); ++it){
@ -357,7 +360,7 @@ DBID IndexerTrack::SaveSingleValueField(
std::string selectQuery = boost::str(boost::format( std::string selectQuery = boost::str(boost::format(
"SELECT id FROM %1% WHERE name=?") % fieldTableName); "SELECT id FROM %1% WHERE name=?") % fieldTableName);
db::CachedStatement stmt(selectQuery.c_str(), dbConnection); db::Statement stmt(selectQuery.c_str(), dbConnection);
std::string value = this->GetValue(trackMetadataKeyName.c_str()); std::string value = this->GetValue(trackMetadataKeyName.c_str());
stmt.BindText(0, value); stmt.BindText(0, value);
@ -450,7 +453,7 @@ DBID IndexerTrack::SaveArtist(db::Connection& dbConnection) {
} }
bool IndexerTrack::Save(db::Connection &dbConnection, std::string libraryDirectory) { bool IndexerTrack::Save(db::Connection &dbConnection, std::string libraryDirectory) {
db::ScopedTransaction transaction(dbConnection); boost::mutex::scoped_lock lock(trackWriteLock);
if (this->GetValue("album_artist") == "") { if (this->GetValue("album_artist") == "") {
this->SetValue("album_artist", this->GetValue("artist").c_str()); this->SetValue("album_artist", this->GetValue("artist").c_str());
@ -477,7 +480,7 @@ bool IndexerTrack::Save(db::Connection &dbConnection, std::string libraryDirecto
/* update all of the track foreign keys */ /* update all of the track foreign keys */
{ {
db::CachedStatement stmt( db::Statement stmt(
"UPDATE tracks " \ "UPDATE tracks " \
"SET album_id=?, visual_genre_id=?, visual_artist_id=?, album_artist_id=?, thumbnail_id=? " \ "SET album_id=?, visual_genre_id=?, visual_artist_id=?, album_artist_id=?, thumbnail_id=? " \
"WHERE id=?", dbConnection); "WHERE id=?", dbConnection);
@ -510,7 +513,7 @@ DBID IndexerTrack::SaveNormalizedFieldValue(
{ {
std::string query = boost::str(boost::format("SELECT id FROM %1% WHERE name=?") % tableName); std::string query = boost::str(boost::format("SELECT id FROM %1% WHERE name=?") % tableName);
db::CachedStatement stmt(query.c_str(), dbConnection); db::Statement stmt(query.c_str(), dbConnection);
stmt.BindText(0, fieldValue); stmt.BindText(0, fieldValue);
if (stmt.Step() == db::Row) { if (stmt.Step() == db::Row) {
@ -524,7 +527,7 @@ DBID IndexerTrack::SaveNormalizedFieldValue(
std::string query = boost::str(boost::format( std::string query = boost::str(boost::format(
"INSERT INTO %1% (name, aggregated) VALUES (?, ?)") % tableName); "INSERT INTO %1% (name, aggregated) VALUES (?, ?)") % tableName);
db::CachedStatement stmt(query.c_str(), dbConnection); db::Statement stmt(query.c_str(), dbConnection);
stmt.BindText(0, fieldValue); stmt.BindText(0, fieldValue);
stmt.BindInt(1, isAggregatedValue ? 1 : 0); stmt.BindInt(1, isAggregatedValue ? 1 : 0);

View File

@ -41,5 +41,4 @@
#include <core/db/Connection.h> #include <core/db/Connection.h>
#include <core/db/Statement.h> #include <core/db/Statement.h>
#include <core/db/CachedStatement.h>
#endif #endif