More HttpStream bugfixes around simultaenous Players with the same

endpoint.
This commit is contained in:
casey langen 2020-10-21 23:58:38 -07:00
parent 70579d097f
commit 08a547253f
4 changed files with 36 additions and 57 deletions

View File

@ -50,6 +50,7 @@
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <chrono>
/* meh... */ /* meh... */
#include <../../3rdparty/include/nlohmann/json.hpp> #include <../../3rdparty/include/nlohmann/json.hpp>
@ -65,15 +66,16 @@
#define DLLEXPORT #define DLLEXPORT
#endif #endif
using namespace std::chrono;
using namespace musik::core::sdk; using namespace musik::core::sdk;
namespace al = boost::algorithm; namespace al = boost::algorithm;
static std::mutex globalMutex; static std::mutex globalMutex;
static std::unordered_set<int64_t> idsInProgress;
static IEnvironment* environment; static IEnvironment* environment;
static LruDiskCache diskCache; static LruDiskCache diskCache;
static std::string cachePath; static std::string cachePath;
static IPreferences* prefs; static IPreferences* prefs;
static int64_t nextInstanceId = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
static const int kDefaultMaxCacheFiles = 35; static const int kDefaultMaxCacheFiles = 35;
static const int kDefaultPreCacheSizeBytes = 524288; /*2^19 */ static const int kDefaultPreCacheSizeBytes = 524288; /*2^19 */
@ -144,8 +146,8 @@ class FileReadStream {
this->Reset(); this->Reset();
} }
FileReadStream(const std::string& fn) { FileReadStream(const std::string& fn, int64_t instanceId) {
this->file = diskCache.Open(cacheId(fn), "rb"); this->file = diskCache.Open(cacheId(fn), instanceId, "rb");
this->maxLength = -1; this->maxLength = -1;
Reset(); Reset();
} }
@ -237,6 +239,7 @@ HttpDataStream::HttpDataStream() {
this->state = State::NotStarted; this->state = State::NotStarted;
this->writeFile = nullptr; this->writeFile = nullptr;
this->interrupted = false; this->interrupted = false;
this->instanceId = ++nextInstanceId;
} }
HttpDataStream::~HttpDataStream() { HttpDataStream::~HttpDataStream() {
@ -300,31 +303,19 @@ bool HttpDataStream::Open(const char *rawUri, OpenFlags flags) {
auto const id = cacheId(httpUri); auto const id = cacheId(httpUri);
{
std::unique_lock<std::mutex> globalLock(globalMutex);
if (idsInProgress.find(id) != idsInProgress.end()) {
this->state = State::Conflict;
return false; /* another data stream is already processing this file. */
}
if (diskCache.Cached(id)) { if (diskCache.Cached(id)) {
FILE* file = diskCache.Open(id, "rb", this->type, this->length); FILE* file = diskCache.Open(id, this->instanceId, "rb", this->type, this->length);
if (file) { if (file) {
this->reader = std::make_shared<FileReadStream>(file, this->length); this->reader = std::make_shared<FileReadStream>(file, this->length);
this->state = State::Cached; this->state = State::Cached;
return true; return true;
} }
else { else {
diskCache.Delete(id); diskCache.Delete(id, this->instanceId);
} }
} }
this->ResetFileHandles(); this->ResetFileHandles();
if (this->writeFile && this->reader) {
idsInProgress.insert(id);
}
}
} }
if (this->writeFile && this->reader) { if (this->writeFile && this->reader) {
@ -400,10 +391,10 @@ void HttpDataStream::ResetFileHandles() {
} }
auto const id = cacheId(httpUri); auto const id = cacheId(httpUri);
diskCache.Delete(id); diskCache.Delete(id, this->instanceId);
this->writeFile = diskCache.Open(id, "wb"); this->writeFile = diskCache.Open(id, this->instanceId, "wb");
if (this->writeFile) { if (this->writeFile) {
this->reader = std::make_shared<FileReadStream>(this->httpUri); this->reader = std::make_shared<FileReadStream>(this->httpUri, this->instanceId);
} }
} }
@ -480,23 +471,10 @@ bool HttpDataStream::Close() {
download failed, let's delete the temp file. */ download failed, let's delete the temp file. */
auto id = cacheId(this->httpUri); auto id = cacheId(this->httpUri);
if (this->state == State::Downloaded) { if (this->state == State::Downloaded) {
diskCache.Finalize(id, this->Type()); diskCache.Finalize(id, this->instanceId, this->Type());
} }
else if (this->state != State::Cached) { else if (this->state != State::Cached) {
diskCache.Delete(id); diskCache.Delete(id, this->instanceId);
}
{
/* if we were the instance that was actually downloading this file, let's
go ahead and remove it from the processing set so subsequent instances are
able to load it from cache. */
if (this->state != State::Conflict) {
std::unique_lock<std::mutex> globalLock(globalMutex);
auto it = idsInProgress.find(cacheId(httpUri));
if (it != idsInProgress.end()) {
idsInProgress.erase(it);
}
}
} }
return true; return true;

View File

@ -105,4 +105,5 @@ class HttpDataStream : public IDataStream {
std::shared_ptr<std::thread> downloadThread; std::shared_ptr<std::thread> downloadThread;
std::shared_ptr<FileReadStream> reader; std::shared_ptr<FileReadStream> reader;
int precacheSizeBytes, chunkSizeBytes, maxCacheFiles; int precacheSizeBytes, chunkSizeBytes, maxCacheFiles;
int64_t instanceId;
}; };

View File

@ -49,8 +49,8 @@ namespace al = boost::algorithm;
using Lock = std::unique_lock<std::recursive_mutex>; using Lock = std::unique_lock<std::recursive_mutex>;
static std::string tempFilename(const std::string& root, size_t id) { static std::string tempFilename(const std::string& root, size_t id, int64_t instance) {
return root + "/" + PREFIX + "_" + std::to_string(id) + TEMP_EXTENSION; return root + "/" + PREFIX + "_" + std::to_string(id) + "_" + std::to_string(instance) + TEMP_EXTENSION;
} }
static std::string finalFilename(const std::string& root, size_t id, std::string type) { static std::string finalFilename(const std::string& root, size_t id, std::string type) {
@ -166,14 +166,14 @@ LruDiskCache::EntryPtr LruDiskCache::Parse(const fs::path& path) {
return EntryPtr(); return EntryPtr();
} }
bool LruDiskCache::Finalize(size_t id, std::string type) { bool LruDiskCache::Finalize(size_t id, int64_t instance, std::string type) {
Lock lock(stateMutex); Lock lock(stateMutex);
if (type.size() == 0) { if (type.size() == 0) {
type = "unknown"; type = "unknown";
} }
fs::path src(tempFilename(this->root, id)); fs::path src(tempFilename(this->root, instance, id));
fs::path dst(finalFilename(this->root, id, type)); fs::path dst(finalFilename(this->root, id, type));
if (fs::exists(src)) { if (fs::exists(src)) {
@ -211,13 +211,13 @@ bool LruDiskCache::Cached(size_t id) {
return it != end; return it != end;
} }
FILE* LruDiskCache::Open(size_t id, const std::string& mode) { FILE* LruDiskCache::Open(size_t id, int64_t instance, const std::string& mode) {
std::string type; std::string type;
size_t len; size_t len;
return this->Open(id, mode, type, len); return this->Open(id, instance, mode, type, len);
} }
FILE* LruDiskCache::Open(size_t id, const std::string& mode, std::string& type, size_t& len) { FILE* LruDiskCache::Open(size_t id, int64_t instance, const std::string& mode, std::string& type, size_t& len) {
Lock lock(stateMutex); Lock lock(stateMutex);
auto end = this->cached.end(); auto end = this->cached.end();
@ -241,10 +241,10 @@ FILE* LruDiskCache::Open(size_t id, const std::string& mode, std::string& type,
} }
/* open the file and return it regardless of cache status. */ /* open the file and return it regardless of cache status. */
return result ? result : fopen(tempFilename(this->root, id).c_str(), mode.c_str()); return result ? result : fopen(tempFilename(this->root, instance, id).c_str(), mode.c_str());
} }
void LruDiskCache::Delete(size_t id) { void LruDiskCache::Delete(size_t id, int64_t instance) {
Lock lock(stateMutex); Lock lock(stateMutex);
auto it = this->cached.begin(); auto it = this->cached.begin();
@ -258,7 +258,7 @@ void LruDiskCache::Delete(size_t id) {
} }
} }
rm(tempFilename(this->root, id)); rm(tempFilename(this->root, id, instance));
} }
void LruDiskCache::SortAndPrune() { void LruDiskCache::SortAndPrune() {

View File

@ -46,11 +46,11 @@ class LruDiskCache {
void Purge(); void Purge();
bool Finalize(size_t id, std::string type); bool Finalize(size_t id, int64_t instance, std::string type);
FILE* Open(size_t id, const std::string& mode); FILE* Open(size_t id, int64_t instance, const std::string& mode);
FILE* Open(size_t id, const std::string& mode, std::string& type, size_t& len); FILE* Open(size_t id, int64_t instance, const std::string& mode, std::string& type, size_t& len);
bool Cached(size_t id); bool Cached(size_t id);
void Delete(size_t id); void Delete(size_t id, int64_t instance);
void Touch(size_t id); void Touch(size_t id);
void Init(const std::string& root, size_t maxEntries); void Init(const std::string& root, size_t maxEntries);