PulseAudio output plugin seems to actually play stuff. But it's not pretty.

This commit is contained in:
clangen 2016-06-06 02:07:50 -07:00
parent 33bd0240b6
commit 3b05e294a8
2 changed files with 159 additions and 71 deletions

View File

@ -1,7 +1,7 @@
#include "PulseOut.h" #include "PulseOut.h"
#include <iostream> #include <iostream>
#define BUFFER_COUNT 8 #define BUFFER_COUNT 48 /* seems to work well? */
using namespace musik::core::audio; using namespace musik::core::audio;
@ -23,7 +23,7 @@ class MainLoopLock {
static bool waitForCompletion(pa_operation *op, pa_threaded_mainloop *loop) { static bool waitForCompletion(pa_operation *op, pa_threaded_mainloop *loop) {
if (op) { if (op) {
pa_operation_state_t state; pa_operation_state_t state;
while ((state == pa_operation_get_state(op)) == PA_OPERATION_RUNNING) { while ((state = pa_operation_get_state(op)) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait(loop); pa_threaded_mainloop_wait(loop);
} }
@ -45,9 +45,16 @@ size_t PulseOut::CountBuffersWithProvider(IBufferProvider* provider) {
} }
++it; ++it;
} }
return count; return count;
} }
long long bufferLengthMicroSeconds(IBuffer* buffer) {
long long samples = buffer->Samples();
long long rate = buffer->SampleRate();
return (samples * 1000000) / rate;
}
bool PulseOut::RemoveBufferFromQueue(BufferContext* context) { bool PulseOut::RemoveBufferFromQueue(BufferContext* context) {
boost::recursive_mutex::scoped_lock bufferLock(this->mutex); boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
@ -55,6 +62,7 @@ bool PulseOut::RemoveBufferFromQueue(BufferContext* context) {
while (it != this->buffers.end()) { while (it != this->buffers.end()) {
if ((*it).get() == context) { if ((*it).get() == context) {
this->buffers.erase(it); this->buffers.erase(it);
this->bufferQueueLength -= bufferLengthMicroSeconds((*it)->buffer);
return true; return true;
} }
++it; ++it;
@ -66,10 +74,7 @@ bool PulseOut::RemoveBufferFromQueue(BufferContext* context) {
void PulseOut::NotifyBufferCompleted(BufferContext* context) { void PulseOut::NotifyBufferCompleted(BufferContext* context) {
IBufferProvider* provider = context->provider; IBufferProvider* provider = context->provider;
IBuffer* buffer = context->buffer; IBuffer* buffer = context->buffer;
provider->OnBufferProcessed(buffer);
if (this->RemoveBufferFromQueue(context)) {
provider->OnBufferProcessed(buffer);
}
} }
PulseOut::PulseOut() { PulseOut::PulseOut() {
@ -80,8 +85,9 @@ PulseOut::PulseOut() {
this->pulseStreamFormat.format = PA_SAMPLE_FLOAT32LE; this->pulseStreamFormat.format = PA_SAMPLE_FLOAT32LE;
this->pulseStreamFormat.rate = 0; this->pulseStreamFormat.rate = 0;
this->pulseStreamFormat.channels = 0; this->pulseStreamFormat.channels = 0;
this->bufferQueueLength = 0.0f;
boost::thread th(boost::bind(&PulseOut::ThreadProc,this)); boost::thread th(boost::bind(&PulseOut::ThreadProc, this));
th.detach(); th.detach();
this->InitPulse(); this->InitPulse();
@ -93,19 +99,19 @@ bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
this->pulseStreamFormat.channels != buffer->Channels()) this->pulseStreamFormat.channels != buffer->Channels())
{ {
if (this->pulseStream) { if (this->pulseStream) {
std::cerr << "fixme: stream switched formats; not handled\n"; std::cerr << "PulseOut: stream switched formats; not handled\n";
return false; return false;
} }
this->DeinitPulseStream(); this->DeinitPulseStream();
if (!this->InitPulseStream(buffer->SampleRate(), buffer->Channels())) { if (!this->InitPulseStream(buffer->SampleRate(), buffer->Channels())) {
std::cerr << "could not initialize stream for playback\n"; std::cerr << "PulseOut: could not initialize stream for playback\n";
return false; return false;
} }
} }
if (this->CountBuffersWithProvider(provider) >= BUFFER_COUNT) { if (this->CountBuffersWithProvider(provider) >= BUFFER_COUNT) {
std::cerr << "full!\n"; // std::cerr << "PulseOut: buffers are full!\n";
return false; return false;
} }
@ -116,13 +122,19 @@ bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
{ {
boost::recursive_mutex::scoped_lock bufferLock(this->mutex); boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
long long bufferLength = bufferLengthMicroSeconds(buffer);
//std::cerr << "PulseOut: bufferLength " << bufferLength << std::endl;
pa_usec_t currentTime;
pa_stream_get_time(this->pulseStream, &currentTime);
context->endTime = bufferLength + currentTime + this->bufferQueueLength;
this->bufferQueueLength += bufferLength;
this->buffers.push_back(context); this->buffers.push_back(context);
} }
MainLoopLock loopLock(this->pulseMainLoop); MainLoopLock loopLock(this->pulseMainLoop);
//std::cerr << buffer->Bytes() << std::endl;
int error = int error =
pa_stream_write_ext_free( pa_stream_write_ext_free(
this->pulseStream, this->pulseStream,
@ -134,11 +146,14 @@ bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
PA_SEEK_RELATIVE); PA_SEEK_RELATIVE);
if (error) { if (error) {
std::cerr << "FAILED!! " << error << std::endl; std::cerr << "PulseOut: FAILED!! this should not happen." << error << std::endl;
this->NotifyBufferCompleted(context.get()); this->NotifyBufferCompleted(context.get());
} }
else {
// std::cerr << "PulseOut: buffer enqueued!" << std::endl;
}
// std::cerr << "wrote " << (error ? "unsuccessfully" : "successfully") << std::endl; // std::cerr << "PulseOut: wrote " << (error ? "unsuccessfully" : "successfully") << std::endl;
return !error; return !error;
} }
@ -149,13 +164,56 @@ PulseOut::~PulseOut() {
} }
void PulseOut::ThreadProc() { void PulseOut::ThreadProc() {
pa_usec_t lastTime = -1;
pa_usec_t now;
while (true) { while (true) {
pa_usec_t usec;
if (this->pulseStream) { if (this->pulseStream) {
pa_stream_get_time(this->pulseStream, &usec); pa_stream_get_time(this->pulseStream, &now);
std::cerr << "time: " << usec << std::endl; //std::cerr << "PulseOut: time now: " << now << std::endl;
std::list<std::shared_ptr<BufferContext> > toNotify;
{
//std::cerr << "PulseOut: lastTime " << lastTime << " now " << now << std::endl;
/* detect underrun */
if (lastTime == now) {
std::cerr << "PulseOut: UNDERRUN!" << std::endl;
now = this->buffers.front()->endTime + 1;
}
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
auto it = this->buffers.begin();
while (it != this->buffers.end()) {
if ((*it)->endTime <= now) {
toNotify.push_back(*it);
this->bufferQueueLength -= bufferLengthMicroSeconds((*it)->buffer);
it = this->buffers.erase(it);
}
else {
++it;
}
}
if (!toNotify.size() && this->buffers.size()) {
//std::cerr << "PulseOut: TOP TOO FAR AHEAD AT " << this->buffers.front()->endTime << std::endl;
}
else {
//std::cerr << "PulseOut: we think we have something...?";
}
}
auto it = toNotify.begin();
while (it != toNotify.end()) {
this->NotifyBufferCompleted((*it).get());
++it;
}
} }
usleep(1000 * 1000);
lastTime = now;
usleep(50 * 1000);
} }
} }
@ -173,7 +231,6 @@ void PulseOut::Resume() {
void PulseOut::SetPaused(bool paused) { void PulseOut::SetPaused(bool paused) {
if (this->pulseStream) { if (this->pulseStream) {
std::cerr << "resuming... ";
MainLoopLock loopLock(this->pulseMainLoop); MainLoopLock loopLock(this->pulseMainLoop);
waitForCompletion( waitForCompletion(
pa_stream_cork( pa_stream_cork(
@ -182,7 +239,6 @@ void PulseOut::SetPaused(bool paused) {
&PulseOut::OnPulseStreamSuccessCallback, &PulseOut::OnPulseStreamSuccessCallback,
this), this),
this->pulseMainLoop); this->pulseMainLoop);
std::cerr << "resumed";
} }
} }
@ -190,18 +246,43 @@ void PulseOut::SetVolume(double volume) {
} }
void PulseOut::Stop() { void PulseOut::Stop() {
std::deque<std::shared_ptr<BufferContext> > toNotify;
{
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
if (this->pulseStream) {
MainLoopLock loopLock(this->pulseMainLoop);
std::swap(this->buffers, toNotify);
waitForCompletion(
pa_stream_flush(
this->pulseStream,
&PulseOut::OnPulseStreamSuccessCallback,
this),
this->pulseMainLoop);
}
}
auto it = toNotify.begin();
while (it != toNotify.end()) {
this->bufferQueueLength -= bufferLengthMicroSeconds((*it)->buffer);
this->NotifyBufferCompleted((*it).get());
++it;
}
} }
void PulseOut::OnPulseBufferPlayed(void *data) { void PulseOut::OnPulseBufferPlayed(void *data) {
BufferContext* context = static_cast<BufferContext*>(data); // BufferContext* context = static_cast<BufferContext*>(data);
context->output->NotifyBufferCompleted(context); // context->output->NotifyBufferCompleted(context);
} }
void PulseOut::OnPulseContextStateChanged(pa_context *context, void *data) { void PulseOut::OnPulseContextStateChanged(pa_context *context, void *data) {
PulseOut* out = static_cast<PulseOut*>(data); PulseOut* out = static_cast<PulseOut*>(data);
const pa_context_state_t state = pa_context_get_state(context); const pa_context_state_t state = pa_context_get_state(context);
std::cerr << "context connection state changed: " << state << std::endl; std::cerr << "PulseOut: context connection state changed: " << state << std::endl;
switch (state) { switch (state) {
case PA_CONTEXT_READY: case PA_CONTEXT_READY:
@ -217,7 +298,7 @@ void PulseOut::OnPulseStreamStateChanged(pa_stream *stream, void *data) {
PulseOut* out = static_cast<PulseOut*>(data); PulseOut* out = static_cast<PulseOut*>(data);
const pa_stream_state_t state = pa_stream_get_state(stream); const pa_stream_state_t state = pa_stream_get_state(stream);
std::cerr << "stream connection state changed: " << state << std::endl; std::cerr << "PulseOut: stream connection state changed: " << state << std::endl;
switch (state) { switch (state) {
case PA_STREAM_READY: case PA_STREAM_READY:
@ -234,7 +315,6 @@ void PulseOut::OnPulseStreamSuccessCallback(pa_stream *s, int success, void *dat
pa_threaded_mainloop_signal(out->pulseMainLoop, 0); pa_threaded_mainloop_signal(out->pulseMainLoop, 0);
} }
void PulseOut::InitPulse() { void PulseOut::InitPulse() {
if (!this->InitPulseEventLoopAndContext()) { if (!this->InitPulseEventLoopAndContext()) {
this->DeinitPulse(); this->DeinitPulse();
@ -242,10 +322,10 @@ void PulseOut::InitPulse() {
} }
bool PulseOut::InitPulseEventLoopAndContext() { bool PulseOut::InitPulseEventLoopAndContext() {
std::cerr << "init...\n"; std::cerr << "PulseOut: init...\n";
this->pulseMainLoop = pa_threaded_mainloop_new(); this->pulseMainLoop = pa_threaded_mainloop_new();
if (this->pulseMainLoop) { if (this->pulseMainLoop) {
std::cerr << "init ok, starting...\n"; std::cerr << "PulseOut: init ok, starting...\n";
int error = pa_threaded_mainloop_start(this->pulseMainLoop); int error = pa_threaded_mainloop_start(this->pulseMainLoop);
if (error) { if (error) {
@ -254,7 +334,7 @@ bool PulseOut::InitPulseEventLoopAndContext() {
return false; return false;
} }
std::cerr << "started ok.\n"; std::cerr << "PulseOut: started ok.\n";
} }
pa_mainloop_api* api = pa_threaded_mainloop_get_api(this->pulseMainLoop); pa_mainloop_api* api = pa_threaded_mainloop_get_api(this->pulseMainLoop);
@ -264,7 +344,7 @@ bool PulseOut::InitPulseEventLoopAndContext() {
this->pulseContext = pa_context_new(api, "musikcube"); this->pulseContext = pa_context_new(api, "musikcube");
if (this->pulseContext) { if (this->pulseContext) {
std::cerr << "context created"; std::cerr << "PulseOut: context created";
pa_context_set_state_callback( pa_context_set_state_callback(
this->pulseContext, this->pulseContext,
@ -284,15 +364,15 @@ bool PulseOut::InitPulseEventLoopAndContext() {
pa_context_get_state(this->pulseContext); pa_context_get_state(this->pulseContext);
if (state == PA_CONTEXT_READY) { if (state == PA_CONTEXT_READY) {
std::cerr << "connected!\n"; std::cerr << "PulseOut: connected!\n";
connected = true; connected = true;
} }
else if (!PA_CONTEXT_IS_GOOD(state)) { else if (!PA_CONTEXT_IS_GOOD(state)) {
std::cerr << "corrupted state! bailing.\n"; std::cerr << "PulseOut: corrupted state! bailing.\n";
error = true; error = true;
} }
else { else {
std::cerr << "waiting for connection...\n"; std::cerr << "PulseOut: waiting for connection...\n";
pa_threaded_mainloop_wait(this->pulseMainLoop); pa_threaded_mainloop_wait(this->pulseMainLoop);
} }
} }
@ -306,59 +386,65 @@ bool PulseOut::InitPulseEventLoopAndContext() {
} }
bool PulseOut::InitPulseStream(size_t rate, size_t channels) { bool PulseOut::InitPulseStream(size_t rate, size_t channels) {
MainLoopLock loopLock(this->pulseMainLoop); bool ready = false;
this->pulseStreamFormat.rate = rate; {
this->pulseStreamFormat.channels = channels; MainLoopLock loopLock(this->pulseMainLoop);
this->pulseStream = pa_stream_new( this->pulseStreamFormat.rate = rate;
this->pulseContext, this->pulseStreamFormat.channels = channels;
"musikcube PulseOut stream",
&this->pulseStreamFormat,
NULL); /* channel mapping */
std::cerr << "creating stream...\n"; this->pulseStream = pa_stream_new(
this->pulseContext,
"musikcube PulseOut stream",
&this->pulseStreamFormat,
NULL); /* channel mapping */
if (this->pulseStream) { std::cerr << "PulseOut: creating stream...\n";
std::cerr << "stream created.\n";
pa_stream_set_state_callback( if (this->pulseStream) {
this->pulseStream, std::cerr << "PulseOut: stream created.\n";
&PulseOut::OnPulseStreamStateChanged,
this);
std::cerr << "connecting the stream for playing...\n"; pa_stream_set_state_callback(
this->pulseStream,
&PulseOut::OnPulseStreamStateChanged,
this);
int error = pa_stream_connect_playback( int flags =
this->pulseStream, PA_STREAM_AUTO_TIMING_UPDATE |
NULL, /* device id */ PA_STREAM_INTERPOLATE_TIMING;
NULL, /* buffering attributes */
PA_STREAM_NOFLAGS, /* additional flags */
NULL, /* initial volume. docs suggest NULL. */
NULL); /* stream to synchronize with. */
if (!error) { std::cerr << "PulseOut: connecting the stream for playing...\n";
std::cerr << "connected. waiting for the stream to become ready\n";
pa_threaded_mainloop_wait(this->pulseMainLoop); int error = pa_stream_connect_playback(
bool ready = pa_stream_get_state(this->pulseStream) == PA_STREAM_READY; this->pulseStream,
NULL, /* device id */
NULL, /* buffering attributes */
(pa_stream_flags_t) flags, /* additional flags */
NULL, /* initial volume. docs suggest NULL. */
NULL); /* stream to synchronize with. */
std::cerr << (ready ? "stream is ready!" : "stream failed") << std::endl; if (!error) {
std::cerr << "PulseOut: connected. waiting for the stream to become ready\n";
if (ready) { pa_threaded_mainloop_wait(this->pulseMainLoop);
this->Resume(); ready = pa_stream_get_state(this->pulseStream) == PA_STREAM_READY;
std::cerr << (ready ? "PulseOut: stream is ready!" : "stream failed") << std::endl;
} }
return ready;
} }
} }
return false; if (ready) {
this->Resume();
}
return ready;
} }
void PulseOut::DeinitPulseStream() { void PulseOut::DeinitPulseStream() {
if (this->pulseStream) { if (this->pulseStream) {
std::cerr << "freeing stream...\n"; std::cerr << "PulseOut: freeing stream...\n";
MainLoopLock loopLock(this->pulseMainLoop); MainLoopLock loopLock(this->pulseMainLoop);
pa_stream_disconnect(this->pulseStream); pa_stream_disconnect(this->pulseStream);
pa_stream_unref(this->pulseStream); pa_stream_unref(this->pulseStream);
@ -370,7 +456,7 @@ void PulseOut::DeinitPulse() {
this->DeinitPulseStream(); this->DeinitPulseStream();
if (this->pulseContext) { if (this->pulseContext) {
std::cerr << "freeing context...\n"; std::cerr << "PulseOut: freeing context...\n";
MainLoopLock loopLock(this->pulseMainLoop); MainLoopLock loopLock(this->pulseMainLoop);
pa_context_disconnect(this->pulseContext); pa_context_disconnect(this->pulseContext);
pa_context_unref(this->pulseContext); pa_context_unref(this->pulseContext);
@ -378,7 +464,7 @@ void PulseOut::DeinitPulse() {
} }
if (this->pulseMainLoop) { if (this->pulseMainLoop) {
std::cerr << "stopping...\n"; std::cerr << "PulseOut: stopping...\n";
pa_threaded_mainloop_stop(this->pulseMainLoop); pa_threaded_mainloop_stop(this->pulseMainLoop);
pa_threaded_mainloop_free(this->pulseMainLoop); pa_threaded_mainloop_free(this->pulseMainLoop);
this->pulseMainLoop = NULL; this->pulseMainLoop = NULL;

View File

@ -2,7 +2,7 @@
#include <core/sdk/IOutput.h> #include <core/sdk/IOutput.h>
#include <core/sdk/IBufferProvider.h> #include <core/sdk/IBufferProvider.h>
#include <queue> #include <deque>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/thread/condition.hpp> #include <boost/thread/condition.hpp>
#include <boost/thread/recursive_mutex.hpp> #include <boost/thread/recursive_mutex.hpp>
@ -28,6 +28,7 @@ class PulseOut : public musik::core::audio::IOutput {
PulseOut *output; PulseOut *output;
musik::core::audio::IBuffer *buffer; musik::core::audio::IBuffer *buffer;
musik::core::audio::IBufferProvider *provider; musik::core::audio::IBufferProvider *provider;
long long endTime;
}; };
static void OnPulseContextStateChanged(pa_context *c, void *data); static void OnPulseContextStateChanged(pa_context *c, void *data);
@ -35,7 +36,7 @@ class PulseOut : public musik::core::audio::IOutput {
static void OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data); static void OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data);
static void OnPulseBufferPlayed(void *data); static void OnPulseBufferPlayed(void *data);
void ThreadProc(); void ThreadProc(); /* ugh shoot me */
void NotifyBufferCompleted(BufferContext *context); void NotifyBufferCompleted(BufferContext *context);
bool RemoveBufferFromQueue(BufferContext* context); bool RemoveBufferFromQueue(BufferContext* context);
@ -49,11 +50,12 @@ class PulseOut : public musik::core::audio::IOutput {
void SetPaused(bool paused); void SetPaused(bool paused);
double volume; double volume;
std::list<std::shared_ptr<BufferContext> > buffers; std::deque<std::shared_ptr<BufferContext> > buffers;
boost::thread thread; boost::thread thread;
boost::recursive_mutex mutex; boost::recursive_mutex mutex;
pa_threaded_mainloop* pulseMainLoop; pa_threaded_mainloop* pulseMainLoop;
pa_context* pulseContext; pa_context* pulseContext;
pa_stream* pulseStream; pa_stream* pulseStream;
pa_sample_spec pulseStreamFormat; pa_sample_spec pulseStreamFormat;
double bufferQueueLength;
}; };