Fixed a eventual mutex problem in the Library.

This commit is contained in:
Daniel Önnerby 2008-09-05 20:30:48 +00:00
parent 86bd96e710
commit 31e7c6daa7
13 changed files with 87 additions and 83 deletions

View File

@ -54,7 +54,6 @@ using namespace musik::core;
Library::Base::Base(utfstring identifier)
:identifier(identifier)
,queueCallbackStarted(false)
,bCurrentQueryCanceled(false)
,exit(false)
{
}
@ -217,10 +216,10 @@ bool Library::Base::AddQuery( const Query::Base &query,unsigned int options ){
/////////////////////////////////////////////////////////////////////////////
{
// Lock the mutex for accessing the query queues
// Lock the mutex for accessing the query queues and query status
boost::mutex::scoped_lock lock(this->libraryMutex);
bool bCancelCurrentQuery(false);
bool cancelCurrentQuery(false);
/////////////////////////////////////////////////////////////////////////////
// Clear unparsed queue that match CANCEL options
@ -264,10 +263,10 @@ bool Library::Base::AddQuery( const Query::Base &query,unsigned int options ){
if(this->runningQuery){
if( !(this->runningQuery->options & Query::Options::UnCanceable) ){
if( options & Query::Options::CancelQueue ){
bCancelCurrentQuery = true;
cancelCurrentQuery = true;
}else if( options & Query::Options::CancelSimilar ){
if( this->runningQuery->queryId == queryCopy->queryId ){
bCancelCurrentQuery = true;
cancelCurrentQuery = true;
}
}
}
@ -283,7 +282,7 @@ bool Library::Base::AddQuery( const Query::Base &query,unsigned int options ){
/////////////////////////////////////////////////////////////////////////////
// Cancel currently running query
if(bCancelCurrentQuery){
if(cancelCurrentQuery){
this->CancelCurrentQuery();
}
@ -316,7 +315,7 @@ bool Library::Base::AddQuery( const Query::Base &query,unsigned int options ){
this->ClearFinishedQueries();
if( options & Query::Options::AutoCallback ){ // Should the callbacks be involved?
if( !(queryCopy->status&Query::Base::Status::Canceled) ){ // If not canceled
if( !this->QueryCanceled(queryCopy.get()) ){ // If not canceled
queryCopy->RunCallbacks(this); // Run the callbacks.
}
}
@ -336,9 +335,9 @@ Query::Ptr Library::Base::GetNextQuery(){
boost::mutex::scoped_lock lock(this->libraryMutex);
if(this->incomingQueries.size()!=0){
Query::Ptr oQuery = this->incomingQueries.front(); // Cast back to query
Query::Ptr query = this->incomingQueries.front(); // Cast back to query
this->incomingQueries.pop_front();
return oQuery;
return query;
}
// Or return an empty query
@ -438,11 +437,9 @@ bool Library::Base::RunCallbacks(){
///\see
///CancelCurrentQuery
//////////////////////////////////////////
bool Library::Base::QueryCanceled(){
bool Library::Base::QueryCanceled(Query::Base *query){
boost::mutex::scoped_lock lock(this->libraryMutex);
bool bReturn = this->bCurrentQueryCanceled;
this->bCurrentQueryCanceled = false;
return bReturn;
return query->status&Query::Base::Status::Canceled;
}
//////////////////////////////////////////
@ -458,7 +455,7 @@ bool Library::Base::QueryCanceled(){
///QueryCanceled
//////////////////////////////////////////
void Library::Base::CancelCurrentQuery(){
this->bCurrentQueryCanceled = true;
// this->bCurrentQueryCanceled = true;
}
@ -645,7 +642,8 @@ void Library::Base::CreateDatabase(db::Connection &db){
// Create the playlists-table
db.Execute("CREATE TABLE IF NOT EXISTS playlists ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"name TEXT default ''"
"name TEXT default '',"
"user_id INTEGER default 0"
")");
// Create the playlists-table
db.Execute("CREATE TABLE IF NOT EXISTS playlist_tracks ("
@ -654,8 +652,15 @@ void Library::Base::CreateDatabase(db::Connection &db){
"sort_order INTEGER DEFAULT 0"
")");
// Create the users-table
db.Execute("CREATE TABLE IF NOT EXISTS users ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"name TEXT,"
"login TEXT,"
"password TEXT)");
// INDEXES
db.Execute("CREATE UNIQUE INDEX IF NOT EXISTS users_index ON users (login)");
db.Execute("CREATE UNIQUE INDEX IF NOT EXISTS folders_index ON folders (name,parent_id,path_id)");
db.Execute("CREATE UNIQUE INDEX IF NOT EXISTS paths_index ON paths (path)");
db.Execute("CREATE INDEX IF NOT EXISTS genre_index ON genres (sort_order)");

View File

@ -125,8 +125,7 @@ class Base : boost::noncopyable{
utfstring GetLibraryDirectory();
bool QueryCanceled();
bool QueryCanceled(Query::Base *query);
virtual musik::core::Indexer *Indexer();
@ -201,7 +200,7 @@ class Base : boost::noncopyable{
///\remarks
///This mutex needs to be public
//////////////////////////////////////////
boost::mutex oResultMutex;
boost::mutex resultMutex;
protected:
@ -215,7 +214,7 @@ class Base : boost::noncopyable{
///\see
///CancelCurrentQuery|QueryCanceled
//////////////////////////////////////////
bool bCurrentQueryCanceled;
// bool bCurrentQueryCanceled;
//////////////////////////////////////////
///\brief

View File

@ -150,7 +150,6 @@ void Library::LocalDB::ThreadLoop(){
// Add to the finished queries
{
boost::mutex::scoped_lock lock(this->libraryMutex);
this->bCurrentQueryCanceled = false;
this->runningQuery = query;
this->outgoingQueries.push_back(query);
@ -160,7 +159,10 @@ void Library::LocalDB::ThreadLoop(){
////////////////////////////////////////////////////////////
// Lets parse the query
query->ParseQuery(this,this->db);
if(!this->QueryCanceled(query.get())){
query->ParseQuery(this,this->db);
}
{
boost::mutex::scoped_lock lock(this->libraryMutex);
this->runningQuery.reset();
@ -195,7 +197,6 @@ void Library::LocalDB::ThreadLoop(){
///current running SQL Query
//////////////////////////////////////////
void Library::LocalDB::CancelCurrentQuery( ){
this->bCurrentQueryCanceled = true;
this->db.Interrupt();
}

View File

@ -233,7 +233,6 @@ void Library::Remote::WriteThread(){
// Add to the finished queries
{
boost::mutex::scoped_lock lock(this->libraryMutex);
this->bCurrentQueryCanceled = false;
this->runningQuery = query;
this->outgoingQueries.push_back(query);
@ -282,17 +281,6 @@ void Library::Remote::WriteThread(){
}
//////////////////////////////////////////
///\brief
///Cancel the current running query
///
///This method will also send a sqlite3_interrupt to cancel the
///current running SQL Query
//////////////////////////////////////////
void Library::Remote::CancelCurrentQuery( ){
this->bCurrentQueryCanceled = true;
}
//////////////////////////////////////////
///\brief

View File

@ -70,7 +70,6 @@ class Remote : public Library::Base{
virtual utfstring BasePath();
protected:
void CancelCurrentQuery( );
virtual void Exit();
private:

View File

@ -97,16 +97,6 @@ class Base : public sigslot::has_slots<> {
// Variables:
//////////////////////////////////////////
///\brief
///Current state of the query
///
///\remarks
///iState is protected by the Library::Base::oResultMutex
///
///\see
///musik::core::State
//////////////////////////////////////////
enum Status:int{
Started = 1,
Ended = 2,
@ -116,6 +106,13 @@ class Base : public sigslot::has_slots<> {
Finished = 32
};
//////////////////////////////////////////
///\brief
///Current status of the query
///
///\remarks
///status is protected by the Library::Base::libraryMutex
//////////////////////////////////////////
unsigned int status;
//////////////////////////////////////////

View File

@ -65,11 +65,15 @@ bool Query::ListBase::RunCallbacks(Library::Base *library){
TrackVector trackResultsCopy;
{ // Scope for swapping the results safely
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
metadataResultsCopy.swap(this->metadataResults);
trackResultsCopy.swap(this->trackResults);
}
{
boost::mutex::scoped_lock lock(library->libraryMutex);
if( (this->status & Status::Ended)!=0){
// If the query is finished, this function should return true to report that it is finished.
bReturn = true;
@ -113,7 +117,7 @@ bool Query::ListBase::RunCallbacks(Library::Base *library){
}
if(bReturn){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
// Check for trackinfo update
this->trackInfoEvent(trackInfoTracks,trackInfoDuration,trackInfoSize);
}
@ -138,7 +142,7 @@ Query::ListBase::TrackInfoSignal& Query::ListBase::OnTrackInfoEvent(){
}
bool Query::ListBase::ParseTracksSQL(std::string sql,Library::Base *library,db::Connection &db){
if(this->trackEvent.has_connections() && !library->QueryCanceled()){
if(this->trackEvent.has_connections() && !library->QueryCanceled(this)){
db::Statement selectTracks(sql.c_str(),db);
TrackVector tempTrackResults;
@ -151,7 +155,7 @@ bool Query::ListBase::ParseTracksSQL(std::string sql,Library::Base *library,db::
this->trackInfoTracks++;
if( (++row)%100==0 ){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->trackResults.insert(this->trackResults.end(),tempTrackResults.begin(),tempTrackResults.end());
tempTrackResults.clear();
@ -159,7 +163,7 @@ bool Query::ListBase::ParseTracksSQL(std::string sql,Library::Base *library,db::
}
}
if(!tempTrackResults.empty()){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->trackResults.insert(this->trackResults.end(),tempTrackResults.begin(),tempTrackResults.end());
}
@ -173,16 +177,19 @@ bool Query::ListBase::ParseTracksSQL(std::string sql,Library::Base *library,db::
bool Query::ListBase::SendResults(musik::core::xml::WriterNode &queryNode,Library::Base *library){
bool continueSending(true);
while(continueSending){
while(continueSending && !library->QueryCanceled(this)){
MetadataResults metadataResultsCopy;
TrackVector trackResultsCopy;
{ // Scope for swapping the results safely
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
metadataResultsCopy.swap(this->metadataResults);
trackResultsCopy.swap(this->trackResults);
}
{
boost::mutex::scoped_lock lock(library->libraryMutex);
if( (this->status & Status::Ended)!=0){
// If the query is finished, stop sending
@ -194,7 +201,7 @@ bool Query::ListBase::SendResults(musik::core::xml::WriterNode &queryNode,Librar
if(!metadataResultsCopy.empty()){
// Loop metadata tags for results
for( MetadataResults::iterator metatagResult=metadataResultsCopy.begin();metatagResult!=metadataResultsCopy.end();++metatagResult){
for( MetadataResults::iterator metatagResult=metadataResultsCopy.begin();metatagResult!=metadataResultsCopy.end() && !library->QueryCanceled(this);++metatagResult){
std::string metatag(metatagResult->first);
@ -230,7 +237,7 @@ bool Query::ListBase::SendResults(musik::core::xml::WriterNode &queryNode,Librar
}
// Check for Tracks
if( !trackResultsCopy.empty() ){
if( !trackResultsCopy.empty() && !library->QueryCanceled(this) ){
musik::core::xml::WriterNode tracklist(queryNode,"tracklist");
@ -262,8 +269,8 @@ bool Query::ListBase::SendResults(musik::core::xml::WriterNode &queryNode,Librar
}
}
{
boost::mutex::scoped_lock lock(library->oResultMutex);
if(!library->QueryCanceled(this)){
boost::mutex::scoped_lock lock(library->resultMutex);
// Check for trackinfo update
musik::core::xml::WriterNode trackInfoNode(queryNode,"trackinfo");
trackInfoNode.Content() = boost::lexical_cast<std::string>( this->trackInfoTracks );
@ -288,7 +295,7 @@ bool Query::ListBase::RecieveResults(musik::core::xml::ParserNode &queryNode,Lib
int row(0);
{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey];
}
@ -305,14 +312,14 @@ bool Query::ListBase::RecieveResults(musik::core::xml::ParserNode &queryNode,Lib
);
if( (++row)%10==0 ){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey].insert(this->metadataResults[metakey].end(),tempMetadataValues.begin(),tempMetadataValues.end());
tempMetadataValues.clear();
tempMetadataValues.reserve(10);
}
}
if(!tempMetadataValues.empty()){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey].insert(this->metadataResults[metakey].end(),tempMetadataValues.begin(),tempMetadataValues.end());
}
@ -339,7 +346,7 @@ bool Query::ListBase::RecieveResults(musik::core::xml::ParserNode &queryNode,Lib
}
{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->trackResults.insert(this->trackResults.end(),tempTrackResults.begin(),tempTrackResults.end());
}
@ -360,7 +367,7 @@ bool Query::ListBase::RecieveResults(musik::core::xml::ParserNode &queryNode,Lib
if(values.size()>=3){
try{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->trackInfoTracks = boost::lexical_cast<UINT64>( values[0] );
this->trackInfoDuration = boost::lexical_cast<UINT64>( values[1] );
this->trackInfoSize = boost::lexical_cast<UINT64>( values[2] );

View File

@ -480,7 +480,7 @@ void Query::ListSelection::SQLPrependWhereOrAnd(std::string &sql){
//////////////////////////////////////////
void Query::ListSelection::SQLSelectQuery(const char *metakey,const char *sqlStart,const char *sqlEnd,std::set<std::string> &metakeysSelected,std::string &sqlSelectTrackWhere,Library::Base *library){
if(!library->QueryCanceled()){
if(!library->QueryCanceled(this)){
SelectedMetadata::iterator selected = this->selectedMetadata.find(metakey);
if(selected!=this->selectedMetadata.end()){
@ -508,7 +508,7 @@ void Query::ListSelection::SQLSelectQuery(const char *metakey,const char *sqlSta
///Method called by ParseQuery for every queried metakey
//////////////////////////////////////////
void Query::ListSelection::QueryForMetadata(const char *metakey,const char *sql,std::set<std::string> &metakeysQueried,Library::Base *library,db::Connection &db){
if(library->QueryCanceled())
if(library->QueryCanceled(this))
return;
if(metakeysQueried.find(metakey)!=metakeysQueried.end()){
@ -520,7 +520,7 @@ void Query::ListSelection::QueryForMetadata(const char *metakey,const char *sql,
int row(0);
{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey];
}
@ -535,14 +535,14 @@ void Query::ListSelection::QueryForMetadata(const char *metakey,const char *sql,
);
if( (++row)%10==0 ){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey].insert(this->metadataResults[metakey].end(),tempMetadataValues.begin(),tempMetadataValues.end());
tempMetadataValues.clear();
tempMetadataValues.reserve(10);
}
}
if(!tempMetadataValues.empty()){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->metadataResults[metakey].insert(this->metadataResults[metakey].end(),tempMetadataValues.begin(),tempMetadataValues.end());
}

View File

@ -65,7 +65,7 @@ bool Query::PlaylistLoad::ParseQuery(Library::Base *library,db::Connection &db){
selectTracks.BindInt(0,this->playlistId);
while(selectTracks.Step()==db::Row){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
this->trackResults.push_back(TrackPtr(new Track(selectTracks.ColumnInt(0))));
}
return true;

View File

@ -114,7 +114,7 @@ Query::Ptr Query::PlaylistSave::copy() const{
}
bool Query::PlaylistSave::RunCallbacks(Library::Base *library){
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->libraryMutex);
if( (this->status & Status::Ended)!=0){
this->PlaylistSaved(this->playlistId);
return true;

View File

@ -152,7 +152,7 @@ bool TrackMetadata::ParseQuery(Library::Base *library,db::Connection &db){
bool bCancel(false);
while(!this->aRequestTracks.empty() && !bCancel){
while(!this->aRequestTracks.empty() && !library->QueryCanceled(this)){
TrackPtr track(this->aRequestTracks.back());
this->aRequestTracks.pop_back();
@ -215,23 +215,18 @@ bool TrackMetadata::ParseQuery(Library::Base *library,db::Connection &db){
{
boost::mutex::scoped_lock oLock(library->oResultMutex);
boost::mutex::scoped_lock oLock(library->resultMutex);
this->aResultTracks.push_back(track);
}
}
trackData.Reset();
{
boost::mutex::scoped_lock oLock(library->oResultMutex);
bCancel = ((this->status & Status::Canceled)!=0);
}
}
{
boost::mutex::scoped_lock oLock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->libraryMutex);
this->status |= Status::Ended;
}
@ -245,9 +240,11 @@ bool TrackMetadata::RunCallbacks(Library::Base *library){
// First swap the results so that Query can continue to parse
{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
aResultCopy.swap(this->aResultTracks);
}
{
boost::mutex::scoped_lock lock(library->libraryMutex);
if( (this->status & Status::Ended)!=0){
bReturn = true;
}
@ -384,9 +381,11 @@ bool TrackMetadata::SendResults(musik::core::xml::WriterNode &queryNode,Library:
while(continueSending){
TrackVector resultCopy;
{
boost::mutex::scoped_lock lock(library->oResultMutex);
boost::mutex::scoped_lock lock(library->resultMutex);
resultCopy.swap(this->aResultTracks);
}
{
boost::mutex::scoped_lock lock(library->libraryMutex);
if( (this->status & Status::Ended)!=0){
continueSending = false;
}
@ -467,7 +466,7 @@ bool TrackMetadata::RecieveResults(musik::core::xml::ParserNode &queryNode,Libra
}
{
boost::mutex::scoped_lock oLock(library->oResultMutex);
boost::mutex::scoped_lock oLock(library->resultMutex);
this->aResultTracks.push_back(currentTrack);
}
}

View File

@ -52,6 +52,17 @@ namespace musik{ namespace core{
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////
///\brief
///musik::core::Server is the main class for the musikServer
///
///the Server contains everything from an Indexer,
///a musik::core::server::Connection (Library::Base derivate) for each connected user
///and a HTTP server for streaming tracks.
///
///\see
///Indexer | http::Server | server::Connection
//////////////////////////////////////////
class Server{
public:
// Methods

View File

@ -152,7 +152,6 @@ void Connection::ParseThread(){
// Add to the finished queries
{
boost::mutex::scoped_lock lock(this->libraryMutex);
this->bCurrentQueryCanceled = false;
this->runningQuery = query;
this->outgoingQueries.push_back(query);
@ -255,7 +254,6 @@ void Connection::WriteThread(){
///current running SQL Query
//////////////////////////////////////////
void Connection::CancelCurrentQuery( ){
this->bCurrentQueryCanceled = true;
this->db.Interrupt();
}