- Added IDecoder::Exhausted() method that returns true if the input

source has been completely depleted, false otherwise
- Fixed a memory leak in LogWindow
- Fixed CddaDataStream to only allow reading one track at a time. This
is a safety mechanism so clients can't thrash the read head. This also
means remote clients can stream real-time transcoded CD audio.
- Fixed some bugs in TranscodingDataStream where temp files weren't
always getting cleaned up properly until a restart
This commit is contained in:
casey langen 2017-08-06 15:14:57 -07:00
parent 791f923892
commit 78a146aa07
15 changed files with 174 additions and 192 deletions

View File

@ -46,6 +46,7 @@ namespace musik { namespace core { namespace sdk {
virtual bool GetBuffer(IBuffer *buffer) = 0; virtual bool GetBuffer(IBuffer *buffer) = 0;
virtual double GetDuration() = 0; virtual double GetDuration() = 0;
virtual bool Open(IDataStream *stream) = 0; virtual bool Open(IDataStream *stream) = 0;
virtual bool Exhausted() = 0;
}; };
} } } } } }

View File

@ -59,6 +59,7 @@ LogWindow::LogWindow(IWindow *parent)
} }
LogWindow::~LogWindow() { LogWindow::~LogWindow() {
delete this->adapter;
} }
IScrollAdapter& LogWindow::GetScrollAdapter() { IScrollAdapter& LogWindow::GetScrollAdapter() {

View File

@ -39,18 +39,8 @@
#include <set> #include <set>
#include <mutex> #include <mutex>
#if ENABLE_LOOKAHEAD_BUFFER
/* as it turns out having a small lookahead buffer is better than a
large one. if the buffer is too large, the device seems to start to
power down, then takes a while to power back up. we just want a little
but of wiggle room, but to generally keep the device reading small
chunks of data constantly. */
#define MAX_SECTORS_IN_LOOKAHEAD 20
#define MAX_SECTORS_PER_READ 10
#define BUFFER_SIZE_BYTES (MAX_SECTORS_IN_LOOKAHEAD * BYTES_PER_SECTOR)
#endif
static std::mutex driveAccessMutex; /* one track can read at a time */ static std::mutex driveAccessMutex; /* one track can read at a time */
static CddaDataStream* activeRead = nullptr;
CddaDataStream::CddaDataStream() { CddaDataStream::CddaDataStream() {
this->closed = false; this->closed = false;
@ -58,11 +48,6 @@ CddaDataStream::CddaDataStream() {
this->position = this->length = 0; this->position = this->length = 0;
memset(&this->toc, 0, sizeof(this->toc)); memset(&this->toc, 0, sizeof(this->toc));
this->startSector = this->stopSector = 0; this->startSector = this->stopSector = 0;
#if ENABLE_LOOKAHEAD_BUFFER
this->lookahead = new char[BUFFER_SIZE_BYTES];
this->lookaheadOffset = this->lookaheadTotal = 0;
#endif
} }
CddaDataStream::~CddaDataStream() { CddaDataStream::~CddaDataStream() {
@ -154,8 +139,14 @@ bool CddaDataStream::Close() {
this->drive = INVALID_HANDLE_VALUE; this->drive = INVALID_HANDLE_VALUE;
} }
this->closed = true; {
std::lock_guard<std::mutex> lock(driveAccessMutex);
if (activeRead == this) {
activeRead = nullptr;
}
}
this->closed = true;
return true; return true;
} }
@ -168,6 +159,17 @@ void CddaDataStream::Destroy() {
} }
PositionType CddaDataStream::Read(void* buffer, PositionType readBytes) { PositionType CddaDataStream::Read(void* buffer, PositionType readBytes) {
{
std::lock_guard<std::mutex> lock(driveAccessMutex);
if (activeRead == nullptr) {
activeRead = this;
}
}
if (activeRead != this) {
return (PositionType) ReadError::DeviceBusy;
}
if (this->position >= this->length) { if (this->position >= this->length) {
return 0; return 0;
} }
@ -187,12 +189,6 @@ bool CddaDataStream::SetPosition(PositionType position) {
this->position++; this->position++;
} }
#if ENABLE_LOOKAHEAD_BUFFER
this->RefillInternalBuffer();
this->lookaheadOffset = 0;
Sleep(250);
#endif
return true; return true;
} }
@ -220,70 +216,12 @@ const char* CddaDataStream::Uri() {
return uri.c_str(); return uri.c_str();
} }
#if ENABLE_LOOKAHEAD_BUFFER
void CddaDataStream::RefillInternalBuffer() {
std::unique_lock<std::mutex> lock(driveAccessMutex);
LONGLONG pos = this->position;
int iterations = MAX_SECTORS_IN_LOOKAHEAD / MAX_SECTORS_PER_READ;
DWORD totalBytesRead = 0;
RAW_READ_INFO rawReadInfo = { 0 };
rawReadInfo.SectorCount = MAX_SECTORS_PER_READ;
rawReadInfo.TrackMode = CDDA;
DWORD bytesActuallyRead = 0;
while (iterations > 0) {
UINT sectorOffset = this->startSector + (int)(pos / BYTES_PER_SECTOR);
rawReadInfo.DiskOffset.QuadPart = sectorOffset * 2048;
DeviceIoControl(
this->drive,
IOCTL_CDROM_RAW_READ,
&rawReadInfo,
sizeof(rawReadInfo),
&this->lookahead[totalBytesRead],
BYTES_PER_SECTOR * MAX_SECTORS_PER_READ,
&bytesActuallyRead,
0);
totalBytesRead += bytesActuallyRead;
pos += bytesActuallyRead;
if (totalBytesRead == 0) {
break;
}
--iterations;
}
this->lookaheadOffset = 0;
this->lookaheadTotal = totalBytesRead;
}
#endif
HRESULT CddaDataStream::Read(PBYTE pbBuffer, DWORD dwBytesToRead, BOOL bAlign, LPDWORD pdwBytesRead) { HRESULT CddaDataStream::Read(PBYTE pbBuffer, DWORD dwBytesToRead, BOOL bAlign, LPDWORD pdwBytesRead) {
if (this->closed) { if (this->closed) {
pdwBytesRead = 0; pdwBytesRead = 0;
return S_FALSE; return S_FALSE;
} }
#if ENABLE_LOOKAHEAD_BUFFER
size_t avail = this->lookaheadTotal - this->lookaheadOffset;
if (avail == 0) {
this->RefillInternalBuffer();
avail = this->lookaheadTotal;
}
DWORD readSize = min(avail, (size_t) dwBytesToRead);
if (readSize >= 0) {
memcpy(pbBuffer, &this->lookahead[this->lookaheadOffset], readSize);
this->position += readSize;
this->lookaheadOffset += readSize;
}
#else
DWORD readSize = 0; DWORD readSize = 0;
LONGLONG pos = this->position; LONGLONG pos = this->position;
UINT sectorOffset = this->startSector + (int)(pos / BYTES_PER_SECTOR); UINT sectorOffset = this->startSector + (int)(pos / BYTES_PER_SECTOR);
@ -304,7 +242,6 @@ HRESULT CddaDataStream::Read(PBYTE pbBuffer, DWORD dwBytesToRead, BOOL bAlign, L
0); 0);
this->position += readSize; this->position += readSize;
#endif
if (pdwBytesRead) { if (pdwBytesRead) {
*pdwBytesRead = readSize; *pdwBytesRead = readSize;

View File

@ -37,13 +37,16 @@
#include "ntddcdrm.h" #include "ntddcdrm.h"
#include "devioctl.h" #include "devioctl.h"
#include <string> #include <string>
#include <mutex>
#define ENABLE_LOOKAHEAD_BUFFER 0
using namespace musik::core::sdk; using namespace musik::core::sdk;
class CddaDataStream : public IDataStream { class CddaDataStream : public IDataStream {
public: public:
enum class ReadError : int {
DeviceBusy = -128
};
CddaDataStream(); CddaDataStream();
~CddaDataStream(); ~CddaDataStream();
@ -73,11 +76,4 @@ class CddaDataStream : public IDataStream {
UINT firstSector, startSector, stopSector; UINT firstSector, startSector, stopSector;
unsigned long channels; unsigned long channels;
volatile bool closed; volatile bool closed;
#if ENABLE_LOOKAHEAD_BUFFER
char* lookahead;
DWORD lookaheadOffset;
DWORD lookaheadTotal;
void RefillInternalBuffer();
#endif
}; };

View File

@ -43,6 +43,7 @@ CddaDecoder::CddaDecoder() {
this->duration = -1.0f; this->duration = -1.0f;
this->data = nullptr; this->data = nullptr;
this->buffer = new BYTE[CDDA_BUFFER_SIZE]; this->buffer = new BYTE[CDDA_BUFFER_SIZE];
this->exhausted = false;
} }
CddaDecoder::~CddaDecoder() { CddaDecoder::~CddaDecoder() {
@ -87,6 +88,11 @@ bool CddaDecoder::GetBuffer(IBuffer *buffer) {
PositionType count = this->data->Read( PositionType count = this->data->Read(
(void *) this->buffer, CDDA_BUFFER_SIZE); (void *) this->buffer, CDDA_BUFFER_SIZE);
if (count == (PositionType) CddaDataStream::ReadError::DeviceBusy) {
this->exhausted = false; /* stream not exhausted, waiting for data */
return false;
}
if (count > 0) { if (count > 0) {
short* t = (short*) this->buffer; short* t = (short*) this->buffer;
@ -99,5 +105,6 @@ bool CddaDecoder::GetBuffer(IBuffer *buffer) {
return true; return true;
} }
this->exhausted = true;
return false; return false;
} }

View File

@ -47,14 +47,16 @@ class CddaDecoder : public IDecoder {
CddaDecoder(); CddaDecoder();
~CddaDecoder(); ~CddaDecoder();
virtual bool Open(IDataStream* data); virtual bool Open(IDataStream* data) override;
virtual void Destroy(); virtual void Destroy() override;
virtual double SetPosition(double seconds); virtual double SetPosition(double seconds) override;
virtual double GetDuration(); virtual double GetDuration() override;
virtual bool GetBuffer(IBuffer *buffer); virtual bool GetBuffer(IBuffer *buffer) override;
virtual bool Exhausted() override { return this->exhausted; }
private: private:
CddaDataStream* data; CddaDataStream* data;
double duration; double duration;
BYTE* buffer; BYTE* buffer;
bool exhausted;
}; };

View File

@ -47,7 +47,8 @@ FlacDecoder::FlacDecoder()
, sampleRate(0) , sampleRate(0)
, bitsPerSample(0) , bitsPerSample(0)
, totalSamples(0) , totalSamples(0)
, duration(-1.0f) { , duration(-1.0f)
, exhausted(false) {
this->decoder = FLAC__stream_decoder_new(); this->decoder = FLAC__stream_decoder_new();
} }
@ -62,18 +63,21 @@ FlacDecoder::~FlacDecoder() {
} }
FLAC__StreamDecoderReadStatus FlacDecoder::FlacRead( FLAC__StreamDecoderReadStatus FlacDecoder::FlacRead(
const FLAC__StreamDecoder *decoder, const FLAC__StreamDecoder *dec,
FLAC__byte buffer[], FLAC__byte buffer[],
size_t *bytes, size_t *bytes,
void *clientData) void *clientData)
{ {
size_t readBytes = (size_t)((FlacDecoder*) clientData)->stream->Read(buffer,(long)(*bytes)); auto decoder = (FlacDecoder*) clientData;
size_t readBytes = (size_t) decoder->stream->Read(buffer,(long)(*bytes));
*bytes = readBytes; *bytes = readBytes;
if (readBytes == 0) { if (readBytes == 0) {
decoder->exhausted = true;
return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM; return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
} }
else if(readBytes == (size_t) -1) { else if (readBytes == (size_t) -1) {
decoder->exhausted = true;
return FLAC__STREAM_DECODER_READ_STATUS_ABORT; return FLAC__STREAM_DECODER_READ_STATUS_ABORT;
} }

View File

@ -47,11 +47,12 @@ class FlacDecoder : public musik::core::sdk::IDecoder {
FlacDecoder(); FlacDecoder();
~FlacDecoder(); ~FlacDecoder();
virtual void Destroy(); virtual void Destroy() override;
virtual double SetPosition(double seconds); virtual double SetPosition(double seconds) override;
virtual bool GetBuffer(IBuffer *buffer); virtual bool GetBuffer(IBuffer *buffer) override;
virtual double GetDuration(); virtual double GetDuration() override;
virtual bool Open(musik::core::sdk::IDataStream *stream); virtual bool Open(musik::core::sdk::IDataStream *stream) override;
virtual bool Exhausted() override { return this->exhausted; }
private: private:
static FLAC__StreamDecoderReadStatus FlacRead( static FLAC__StreamDecoderReadStatus FlacRead(
@ -104,6 +105,7 @@ class FlacDecoder : public musik::core::sdk::IDecoder {
uint64_t totalSamples; uint64_t totalSamples;
int bitsPerSample; int bitsPerSample;
double duration; double duration;
bool exhausted;
float *outputBuffer; float *outputBuffer;
unsigned long outputBufferSize; unsigned long outputBufferSize;

View File

@ -83,6 +83,7 @@ M4aDecoder::M4aDecoder() {
this->decoderFile = nullptr; this->decoderFile = nullptr;
memset(&decoderCallbacks, 0, sizeof(this->decoderCallbacks)); memset(&decoderCallbacks, 0, sizeof(this->decoderCallbacks));
this->duration = -1.0f; this->duration = -1.0f;
this->exhausted = false;
} }
M4aDecoder::~M4aDecoder() { M4aDecoder::~M4aDecoder() {
@ -90,7 +91,9 @@ M4aDecoder::~M4aDecoder() {
bool M4aDecoder::Open(musik::core::sdk::IDataStream *stream) { bool M4aDecoder::Open(musik::core::sdk::IDataStream *stream) {
decoder = NeAACDecOpen(); decoder = NeAACDecOpen();
if (!decoder) { if (!decoder) {
this->exhausted = true;
return false; return false;
} }
@ -177,10 +180,7 @@ double M4aDecoder::GetDuration() {
} }
bool M4aDecoder::GetBuffer(IBuffer* target) { bool M4aDecoder::GetBuffer(IBuffer* target) {
if (this->decoderSampleId < 0) { if (this->decoderSampleId >= 0) {
return false;
}
void* sampleBuffer = NULL; void* sampleBuffer = NULL;
unsigned char* encodedData = NULL; unsigned char* encodedData = NULL;
unsigned int encodedDataLength = 0; unsigned int encodedDataLength = 0;
@ -189,10 +189,7 @@ bool M4aDecoder::GetBuffer(IBuffer* target) {
long duration = mp4ff_get_sample_duration( long duration = mp4ff_get_sample_duration(
decoderFile, audioTrackId, decoderSampleId); decoderFile, audioTrackId, decoderSampleId);
if (duration <= 0) { if (duration > 0) {
return false;
}
/* read the raw data required */ /* read the raw data required */
int rc = int rc =
@ -206,6 +203,7 @@ bool M4aDecoder::GetBuffer(IBuffer* target) {
decoderSampleId++; decoderSampleId++;
if (rc == 0 || encodedData == NULL) { if (rc == 0 || encodedData == NULL) {
this->exhausted = true;
return false; return false;
} }
@ -218,14 +216,7 @@ bool M4aDecoder::GetBuffer(IBuffer* target) {
free(encodedData); free(encodedData);
if (frameInfo.error > 0) { if (frameInfo.error <= 0 && decoderSampleId <= this->totalSamples) {
return false;
}
if (decoderSampleId > this->totalSamples) {
return false;
}
target->SetSampleRate(frameInfo.samplerate); target->SetSampleRate(frameInfo.samplerate);
target->SetChannels(frameInfo.channels); target->SetChannels(frameInfo.channels);
target->SetSamples(frameInfo.samples); target->SetSamples(frameInfo.samples);
@ -236,4 +227,10 @@ bool M4aDecoder::GetBuffer(IBuffer* target) {
sizeof(float) * frameInfo.samples); sizeof(float) * frameInfo.samples);
return true; return true;
}
}
}
this->exhausted = true;
return false;
} }

View File

@ -43,11 +43,12 @@ class M4aDecoder : public musik::core::sdk::IDecoder {
M4aDecoder(); M4aDecoder();
~M4aDecoder(); ~M4aDecoder();
virtual void Destroy(); virtual void Destroy() override;
virtual double SetPosition(double seconds); virtual double SetPosition(double seconds) override;
virtual bool GetBuffer(musik::core::sdk::IBuffer *buffer); virtual bool GetBuffer(musik::core::sdk::IBuffer *buffer) override;
virtual double GetDuration(); virtual double GetDuration() override;
virtual bool Open(musik::core::sdk::IDataStream *stream); virtual bool Open(musik::core::sdk::IDataStream *stream) override;
virtual bool Exhausted() override { return this->exhausted; }
private: private:
NeAACDecHandle decoder; NeAACDecHandle decoder;
@ -59,4 +60,5 @@ class M4aDecoder : public musik::core::sdk::IDecoder {
unsigned char channelCount; unsigned char channelCount;
long decoderSampleId; long decoderSampleId;
double duration; double duration;
bool exhausted;
}; };

View File

@ -77,6 +77,7 @@ static int nomadClose(void *datasource) {
} }
NomadDecoder::NomadDecoder() { NomadDecoder::NomadDecoder() {
this->exhausted = false;
this->duration = -1.0f; this->duration = -1.0f;
this->nomadContext = nullptr; this->nomadContext = nullptr;
this->callbacks.read = &nomadRead; this->callbacks.read = &nomadRead;
@ -118,7 +119,12 @@ bool NomadDecoder::GetBuffer(IBuffer *buffer) {
buffer->SetSamples(read > 0 ? read : 0); buffer->SetSamples(read > 0 ? read : 0);
buffer->SetSampleRate(info->sample_rate); buffer->SetSampleRate(info->sample_rate);
return (read > 0) ? true : false; if (read > 0) {
return true;
}
this->exhausted = true;
return false;
} }
bool NomadDecoder::Open(IDataStream *stream) { bool NomadDecoder::Open(IDataStream *stream) {
@ -148,7 +154,12 @@ bool NomadDecoder::Open(IDataStream *stream) {
this->nomadContext = nullptr; this->nomadContext = nullptr;
} }
return result ? false : true; if (!result) {
return true;
}
this->exhausted = false;
return false;
} }
/* adapted from http://stackoverflow.com/a/3520427 */ /* adapted from http://stackoverflow.com/a/3520427 */

View File

@ -46,15 +46,16 @@ class NomadDecoder : public musik::core::sdk::IDecoder {
NomadDecoder(); NomadDecoder();
~NomadDecoder(); ~NomadDecoder();
virtual bool Open(musik::core::sdk::IDataStream *dataStream); virtual bool Open(musik::core::sdk::IDataStream *dataStream) override;
virtual double SetPosition(double seconds); virtual double SetPosition(double seconds) override;
virtual bool GetBuffer(musik::core::sdk::IBuffer *buffer); virtual bool GetBuffer(musik::core::sdk::IBuffer *buffer) override;
virtual double GetDuration(); virtual double GetDuration() override;
virtual void Destroy(); virtual void Destroy() override;
virtual bool Exhausted() override { return this->exhausted; }
private: private:
size_t GetId3v2HeaderLength(musik::core::sdk::IDataStream *stream); size_t GetId3v2HeaderLength(musik::core::sdk::IDataStream *stream);
bool exhausted;
double duration; double duration;
nomad_callbacks callbacks; nomad_callbacks callbacks;
nomad *nomadContext; nomad *nomadContext;

View File

@ -39,6 +39,7 @@
OggDecoder::OggDecoder() { OggDecoder::OggDecoder() {
this->duration = -1.0f; this->duration = -1.0f;
this->exhausted = false;
this->oggCallbacks.read_func = &OggRead; this->oggCallbacks.read_func = &OggRead;
this->oggCallbacks.seek_func = &OggSeek; this->oggCallbacks.seek_func = &OggSeek;
this->oggCallbacks.tell_func = &OggTell; this->oggCallbacks.tell_func = &OggTell;
@ -99,6 +100,7 @@ bool OggDecoder::Open(musik::core::sdk::IDataStream *fileStream) {
this->fileStream = fileStream; this->fileStream = fileStream;
if (ov_open_callbacks(this, &this->oggFile, NULL, 0, this->oggCallbacks) != 0) { if (ov_open_callbacks(this, &this->oggFile, NULL, 0, this->oggCallbacks) != 0) {
this->exhausted = true;
return false; return false;
} }
@ -137,6 +139,7 @@ bool OggDecoder::GetBuffer(IBuffer *buffer) {
&bitstream); &bitstream);
if (samplesRead == 0) { if (samplesRead == 0) {
this->exhausted = true;
return false; return false;
} }

View File

@ -45,11 +45,12 @@ class OggDecoder : public IDecoder {
OggDecoder(); OggDecoder();
~OggDecoder(); ~OggDecoder();
virtual void Destroy(); virtual void Destroy() override;
virtual double SetPosition(double second); virtual double SetPosition(double second) override;
virtual bool GetBuffer(IBuffer *buffer); virtual bool GetBuffer(IBuffer *buffer) override;
virtual double GetDuration(); virtual double GetDuration() override;
virtual bool Open(musik::core::sdk::IDataStream *fileStream); virtual bool Open(musik::core::sdk::IDataStream *fileStream) override;
virtual bool Exhausted() { return this->exhausted; }
/* libvorbis callbacks */ /* libvorbis callbacks */
static size_t OggRead(void *buffer, size_t nofParts, size_t partSize, void *datasource); static size_t OggRead(void *buffer, size_t nofParts, size_t partSize, void *datasource);
@ -59,6 +60,7 @@ class OggDecoder : public IDecoder {
private: private:
musik::core::sdk::IDataStream *fileStream; musik::core::sdk::IDataStream *fileStream;
bool exhausted;
OggVorbis_File oggFile; OggVorbis_File oggFile;
ov_callbacks oggCallbacks; ov_callbacks oggCallbacks;
double duration; double duration;

View File

@ -215,6 +215,10 @@ PositionType TranscodingDataStream::Read(void *buffer, PositionType bytesToRead)
hasBuffer = this->decoder->GetBuffer(this->pcmBuffer); hasBuffer = this->decoder->GetBuffer(this->pcmBuffer);
} }
if (!hasBuffer && !decoder->Exhausted()) {
goto internal_error;
}
/* done with spillover. read new data... */ /* done with spillover. read new data... */
while (hasBuffer && bytesWritten < (size_t) bytesToRead) { while (hasBuffer && bytesWritten < (size_t) bytesToRead) {
/* calculated according to lame.h */ /* calculated according to lame.h */
@ -314,11 +318,19 @@ PositionType TranscodingDataStream::Read(void *buffer, PositionType bytesToRead)
if (bytesWritten == 0) { if (bytesWritten == 0) {
encodedBytes.reset(); encodedBytes.reset();
size_t count = lame_encode_flush( /* 7200 bytes minimum is required for the flush op; see lame.h */
if (encodedBytes.length < 7200) {
encodedBytes.realloc(7200);
}
int count = lame_encode_flush(
lame, lame,
encodedBytes.data, encodedBytes.data,
encodedBytes.length); encodedBytes.length);
this->eof = true;
if (count >= 0) {
memcpy(dst + bytesWritten, encodedBytes.data, count); memcpy(dst + bytesWritten, encodedBytes.data, count);
if (this->outFile) { if (this->outFile) {
@ -332,8 +344,10 @@ PositionType TranscodingDataStream::Read(void *buffer, PositionType bytesToRead)
boost::filesystem::remove(this->tempFilename, ec); boost::filesystem::remove(this->tempFilename, ec);
} }
} }
}
this->eof = true; else {
goto internal_error;
}
} }
this->position += bytesWritten; this->position += bytesWritten;
@ -343,6 +357,8 @@ internal_error:
this->eof = true; this->eof = true;
fclose(this->outFile); fclose(this->outFile);
this->outFile = nullptr; this->outFile = nullptr;
boost::system::error_code ec;
boost::filesystem::remove(this->tempFilename, ec);
return 0; return 0;
} }