Replaced PulseOut polling implementation with callback-driven one. Unfortunately sometimes the time doesn't update on a regular interval.

This commit is contained in:
clangen 2016-06-07 01:15:14 -07:00
parent 596d29ca4d
commit 2dc8fef1e2
2 changed files with 90 additions and 101 deletions

View File

@ -1,7 +1,7 @@
#include "PulseOut.h"
#include <iostream>
#define BUFFER_COUNT 32 /* seems to work well? */
#define BUFFER_COUNT 8 /* seems to work well? */
using namespace musik::core::audio;
@ -37,13 +37,6 @@ static inline bool waitForCompletion(pa_operation *op, pa_threaded_mainloop *loo
return false;
}
/* converts the buffer length to micro seconds */
static inline long long bufferLengthMicroSeconds(IBuffer* buffer) {
long long samples = buffer->Samples();
long long rate = buffer->SampleRate();
return (samples * 1000000) / rate;
}
static inline void notifyBufferCompleted(PulseOut::BufferContext* context) {
IBufferProvider* provider = context->provider;
IBuffer* buffer = context->buffer;
@ -58,15 +51,13 @@ PulseOut::PulseOut() {
this->pulseStreamFormat.format = PA_SAMPLE_FLOAT32LE;
this->pulseStreamFormat.rate = 0;
this->pulseStreamFormat.channels = 0;
this->bufferQueueLength = 0.0f;
this->quit = false;
this->thread.reset(new boost::thread(boost::bind(&PulseOut::ThreadProc, this)));
this->bytesConsumed = 0;
this->bytesWritten = 0;
this->InitPulse();
}
PulseOut::~PulseOut() {
this->Stop();
this->DeinitPulse();
@ -74,7 +65,6 @@ PulseOut::~PulseOut() {
this->thread->join();
}
void PulseOut::Destroy() {
delete this;
}
@ -94,9 +84,11 @@ void PulseOut::SetVolume(double volume) {
if (this->pulseStream) {
pa_cvolume vol;
pa_cvolume_set(&vol, 2, pa_sw_volume_from_linear(volume));
MainLoopLock loopLock(this->pulseMainLoop);
pa_context_set_sink_input_volume(
this->pulseContext,
pa_stream_get_index(this->pulseStream),
@ -111,26 +103,43 @@ void PulseOut::Stop() {
{
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
std::swap(this->buffers, toNotify);
if (this->pulseStream) {
MainLoopLock loopLock(this->pulseMainLoop);
/* unset the callback so we don't try to immediately re-fill
the buffer. or, worse, deadlock */
pa_stream_set_write_callback(
this->pulseStream, NULL, NULL);
/* notify outside of the critical section */
std::swap(this->buffers, toNotify);
pa_operation* flushOp = NULL;
waitForCompletion(
pa_stream_flush(
{
MainLoopLock loopLock(this->pulseMainLoop);
flushOp = pa_stream_flush(
this->pulseStream,
&PulseOut::OnPulseStreamSuccessCallback,
this),
this->pulseMainLoop);
&PulseOut::OnPulseStreamSuccess,
this);
if (flushOp) {
waitForCompletion(flushOp, this->pulseMainLoop);
}
}
/* reset the callback */
pa_stream_set_write_callback(
this->pulseStream,
&PulseOut::OnPulseStreamWrite,
this);
}
this->bytesConsumed = 0;
this->bytesWritten = 0;
}
/* all buffers are dead. notify the IBufferProvider */
auto it = toNotify.begin();
while (it != toNotify.end()) {
this->bufferQueueLength -= bufferLengthMicroSeconds((*it)->buffer);
notifyBufferCompleted((*it).get());
++it;
}
@ -162,18 +171,10 @@ bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
context->output = this;
context->buffer = buffer;
context->provider = provider;
context->lastByte = this->bytesWritten + buffer->Bytes();
{
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
long long bufferLength = bufferLengthMicroSeconds(buffer);
//std::cerr << "PulseOut: bufferLength " << bufferLength << std::endl;
pa_usec_t currentTime;
pa_stream_get_time(this->pulseStream, &currentTime);
context->endTime = bufferLength + currentTime + this->bufferQueueLength;
this->bufferQueueLength += bufferLength;
this->buffers.push_back(context);
}
@ -192,81 +193,64 @@ bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
std::cerr << "PulseOut: FAILED!! this should not happen." << error << std::endl;
notifyBufferCompleted(context.get());
}
else {
// std::cerr << "PulseOut: buffer enqueued!" << std::endl;
}
// std::cerr << "PulseOut: wrote " << (error ? "unsuccessfully" : "successfully") << std::endl;
return !error;
}
void PulseOut::ThreadProc() {
std::cerr << "PulseOut: timing thread started\n";
pa_usec_t lastTime = -1;
pa_usec_t now;
while (!this->quit) {
if (this->pulseStream) {
std::list<std::shared_ptr<BufferContext> > toNotify;
pa_stream_get_time(this->pulseStream, &now);
{
/* detect underrun. the playback clock is past the first element in
the queue. tweak the now flag so the first buffer gets freed and we can
move on to the next set of audio data. */
if (!pa_stream_is_corked(this->pulseStream) && (lastTime == now)) {
std::cerr << "PulseOut: UNDERRUN!" << std::endl;
now = this->buffers.front()->endTime + 1;
}
/* notify all buffers that have finished playback since the last
iteration through this loop. note we add them to a temporary list
and notify outside of the critical section */
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
auto it = this->buffers.begin();
while (it != this->buffers.end()) {
if ((*it)->endTime <= now) {
toNotify.push_back(*it);
this->bufferQueueLength -= bufferLengthMicroSeconds((*it)->buffer);
it = this->buffers.erase(it);
}
else {
++it;
}
}
}
/* actually notify here */
auto it = toNotify.begin();
while (it != toNotify.end()) {
notifyBufferCompleted((*it).get());
++it;
}
}
lastTime = now;
usleep(50 * 1000);
}
std::cerr << "PulseOut: timing thread finished\n";
}
void PulseOut::SetPaused(bool paused) {
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
if (this->pulseStream) {
MainLoopLock loopLock(this->pulseMainLoop);
waitForCompletion(
pa_stream_cork(
pa_operation* corkOp = NULL;
{
MainLoopLock loopLock(this->pulseMainLoop);
corkOp = pa_stream_cork(
this->pulseStream,
paused ? 1 : 0,
&PulseOut::OnPulseStreamSuccessCallback,
this),
this->pulseMainLoop);
&PulseOut::OnPulseStreamSuccess,
this);
if (corkOp) {
waitForCompletion(corkOp, this->pulseMainLoop);
}
}
}
}
void PulseOut::OnPulseStreamWrite(pa_stream *s, size_t bytes, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
std::list<std::shared_ptr<BufferContext> > toNotify;
{
boost::recursive_mutex::scoped_lock bufferLock(out->mutex);
out->bytesConsumed += bytes;
auto it = out->buffers.begin();
while (it != out->buffers.end()) {
if (out->bytesConsumed >= (*it)->lastByte) {
toNotify.push_back(*it);
it = out->buffers.erase(it);
}
else {
break;
}
}
}
/* actually notify here */
auto it = toNotify.begin();
while (it != toNotify.end()) {
notifyBufferCompleted((*it).get());
++it;
}
// std::cerr << bytes << std::endl;
}
void PulseOut::OnPulseContextStateChanged(pa_context *context, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
const pa_context_state_t state = pa_context_get_state(context);
@ -299,7 +283,7 @@ void PulseOut::OnPulseStreamStateChanged(pa_stream *stream, void *data) {
}
}
void PulseOut::OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data) {
void PulseOut::OnPulseStreamSuccess(pa_stream *s, int success, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
pa_threaded_mainloop_signal(out->pulseMainLoop, 0);
}
@ -399,6 +383,11 @@ bool PulseOut::InitPulseStream(size_t rate, size_t channels) {
&PulseOut::OnPulseStreamStateChanged,
this);
pa_stream_set_write_callback(
this->pulseStream,
&PulseOut::OnPulseStreamWrite,
this);
int flags =
PA_STREAM_AUTO_TIMING_UPDATE |
PA_STREAM_INTERPOLATE_TIMING;

View File

@ -14,7 +14,7 @@ class PulseOut : public musik::core::audio::IOutput {
PulseOut *output;
musik::core::audio::IBuffer *buffer;
musik::core::audio::IBufferProvider *provider;
long long endTime;
unsigned long long lastByte;
};
PulseOut();
@ -33,10 +33,9 @@ class PulseOut : public musik::core::audio::IOutput {
private:
static void OnPulseContextStateChanged(pa_context *c, void *data);
static void OnPulseStreamStateChanged(pa_stream *s, void *data);
static void OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data);
static void OnPulseStreamSuccess(pa_stream *s, int success, void *data);
static void OnPulseBufferPlayed(void *data);
void ThreadProc(); /* ugh shoot me */
static void OnPulseStreamWrite(pa_stream *s, size_t bytes, void *data);
size_t CountBuffersWithProvider(musik::core::audio::IBufferProvider *provider);
@ -55,6 +54,7 @@ class PulseOut : public musik::core::audio::IOutput {
pa_context* pulseContext;
pa_stream* pulseStream;
pa_sample_spec pulseStreamFormat;
double bufferQueueLength;
unsigned long long bytesWritten;
unsigned long long bytesConsumed;
volatile bool quit;
};