diff --git a/src/plugins/httpdatastream/HttpDataStream.cpp b/src/plugins/httpdatastream/HttpDataStream.cpp index a98dd5c79..1ebbcf899 100755 --- a/src/plugins/httpdatastream/HttpDataStream.cpp +++ b/src/plugins/httpdatastream/HttpDataStream.cpp @@ -50,6 +50,7 @@ #include #include #include +#include /* meh... */ #include <../../3rdparty/include/nlohmann/json.hpp> @@ -65,15 +66,16 @@ #define DLLEXPORT #endif +using namespace std::chrono; using namespace musik::core::sdk; namespace al = boost::algorithm; static std::mutex globalMutex; -static std::unordered_set idsInProgress; static IEnvironment* environment; static LruDiskCache diskCache; static std::string cachePath; static IPreferences* prefs; +static int64_t nextInstanceId = duration_cast(system_clock::now().time_since_epoch()).count(); static const int kDefaultMaxCacheFiles = 35; static const int kDefaultPreCacheSizeBytes = 524288; /*2^19 */ @@ -144,8 +146,8 @@ class FileReadStream { this->Reset(); } - FileReadStream(const std::string& fn) { - this->file = diskCache.Open(cacheId(fn), "rb"); + FileReadStream(const std::string& fn, int64_t instanceId) { + this->file = diskCache.Open(cacheId(fn), instanceId, "rb"); this->maxLength = -1; Reset(); } @@ -237,6 +239,7 @@ HttpDataStream::HttpDataStream() { this->state = State::NotStarted; this->writeFile = nullptr; this->interrupted = false; + this->instanceId = ++nextInstanceId; } HttpDataStream::~HttpDataStream() { @@ -300,31 +303,19 @@ bool HttpDataStream::Open(const char *rawUri, OpenFlags flags) { auto const id = cacheId(httpUri); - { - std::unique_lock globalLock(globalMutex); - if (idsInProgress.find(id) != idsInProgress.end()) { - this->state = State::Conflict; - return false; /* another data stream is already processing this file. */ + if (diskCache.Cached(id)) { + FILE* file = diskCache.Open(id, this->instanceId, "rb", this->type, this->length); + if (file) { + this->reader = std::make_shared(file, this->length); + this->state = State::Cached; + return true; } - - if (diskCache.Cached(id)) { - FILE* file = diskCache.Open(id, "rb", this->type, this->length); - if (file) { - this->reader = std::make_shared(file, this->length); - this->state = State::Cached; - return true; - } - else { - diskCache.Delete(id); - } - } - - this->ResetFileHandles(); - - if (this->writeFile && this->reader) { - idsInProgress.insert(id); + else { + diskCache.Delete(id, this->instanceId); } } + + this->ResetFileHandles(); } if (this->writeFile && this->reader) { @@ -400,10 +391,10 @@ void HttpDataStream::ResetFileHandles() { } auto const id = cacheId(httpUri); - diskCache.Delete(id); - this->writeFile = diskCache.Open(id, "wb"); + diskCache.Delete(id, this->instanceId); + this->writeFile = diskCache.Open(id, this->instanceId, "wb"); if (this->writeFile) { - this->reader = std::make_shared(this->httpUri); + this->reader = std::make_shared(this->httpUri, this->instanceId); } } @@ -480,23 +471,10 @@ bool HttpDataStream::Close() { download failed, let's delete the temp file. */ auto id = cacheId(this->httpUri); if (this->state == State::Downloaded) { - diskCache.Finalize(id, this->Type()); + diskCache.Finalize(id, this->instanceId, this->Type()); } else if (this->state != State::Cached) { - diskCache.Delete(id); - } - - { - /* if we were the instance that was actually downloading this file, let's - go ahead and remove it from the processing set so subsequent instances are - able to load it from cache. */ - if (this->state != State::Conflict) { - std::unique_lock globalLock(globalMutex); - auto it = idsInProgress.find(cacheId(httpUri)); - if (it != idsInProgress.end()) { - idsInProgress.erase(it); - } - } + diskCache.Delete(id, this->instanceId); } return true; diff --git a/src/plugins/httpdatastream/HttpDataStream.h b/src/plugins/httpdatastream/HttpDataStream.h index 05b8e5e4e..cab8b3429 100755 --- a/src/plugins/httpdatastream/HttpDataStream.h +++ b/src/plugins/httpdatastream/HttpDataStream.h @@ -105,4 +105,5 @@ class HttpDataStream : public IDataStream { std::shared_ptr downloadThread; std::shared_ptr reader; int precacheSizeBytes, chunkSizeBytes, maxCacheFiles; + int64_t instanceId; }; \ No newline at end of file diff --git a/src/plugins/httpdatastream/LruDiskCache.cpp b/src/plugins/httpdatastream/LruDiskCache.cpp index 015db9996..2040369da 100644 --- a/src/plugins/httpdatastream/LruDiskCache.cpp +++ b/src/plugins/httpdatastream/LruDiskCache.cpp @@ -49,8 +49,8 @@ namespace al = boost::algorithm; using Lock = std::unique_lock; -static std::string tempFilename(const std::string& root, size_t id) { - return root + "/" + PREFIX + "_" + std::to_string(id) + TEMP_EXTENSION; +static std::string tempFilename(const std::string& root, size_t id, int64_t instance) { + return root + "/" + PREFIX + "_" + std::to_string(id) + "_" + std::to_string(instance) + TEMP_EXTENSION; } static std::string finalFilename(const std::string& root, size_t id, std::string type) { @@ -166,14 +166,14 @@ LruDiskCache::EntryPtr LruDiskCache::Parse(const fs::path& path) { return EntryPtr(); } -bool LruDiskCache::Finalize(size_t id, std::string type) { +bool LruDiskCache::Finalize(size_t id, int64_t instance, std::string type) { Lock lock(stateMutex); if (type.size() == 0) { type = "unknown"; } - fs::path src(tempFilename(this->root, id)); + fs::path src(tempFilename(this->root, instance, id)); fs::path dst(finalFilename(this->root, id, type)); if (fs::exists(src)) { @@ -211,13 +211,13 @@ bool LruDiskCache::Cached(size_t id) { return it != end; } -FILE* LruDiskCache::Open(size_t id, const std::string& mode) { +FILE* LruDiskCache::Open(size_t id, int64_t instance, const std::string& mode) { std::string type; size_t len; - return this->Open(id, mode, type, len); + return this->Open(id, instance, mode, type, len); } -FILE* LruDiskCache::Open(size_t id, const std::string& mode, std::string& type, size_t& len) { +FILE* LruDiskCache::Open(size_t id, int64_t instance, const std::string& mode, std::string& type, size_t& len) { Lock lock(stateMutex); auto end = this->cached.end(); @@ -241,10 +241,10 @@ FILE* LruDiskCache::Open(size_t id, const std::string& mode, std::string& type, } /* open the file and return it regardless of cache status. */ - return result ? result : fopen(tempFilename(this->root, id).c_str(), mode.c_str()); + return result ? result : fopen(tempFilename(this->root, instance, id).c_str(), mode.c_str()); } -void LruDiskCache::Delete(size_t id) { +void LruDiskCache::Delete(size_t id, int64_t instance) { Lock lock(stateMutex); auto it = this->cached.begin(); @@ -258,7 +258,7 @@ void LruDiskCache::Delete(size_t id) { } } - rm(tempFilename(this->root, id)); + rm(tempFilename(this->root, id, instance)); } void LruDiskCache::SortAndPrune() { diff --git a/src/plugins/httpdatastream/LruDiskCache.h b/src/plugins/httpdatastream/LruDiskCache.h index d95d30f93..ef98077df 100644 --- a/src/plugins/httpdatastream/LruDiskCache.h +++ b/src/plugins/httpdatastream/LruDiskCache.h @@ -46,11 +46,11 @@ class LruDiskCache { void Purge(); - bool Finalize(size_t id, std::string type); - FILE* Open(size_t id, const std::string& mode); - FILE* Open(size_t id, const std::string& mode, std::string& type, size_t& len); + bool Finalize(size_t id, int64_t instance, std::string type); + FILE* Open(size_t id, int64_t instance, const std::string& mode); + FILE* Open(size_t id, int64_t instance, const std::string& mode, std::string& type, size_t& len); bool Cached(size_t id); - void Delete(size_t id); + void Delete(size_t id, int64_t instance); void Touch(size_t id); void Init(const std::string& root, size_t maxEntries);