From 33bd0240b69600039501b3ef610e22fe91836822 Mon Sep 17 00:00:00 2001 From: casey langen Date: Sun, 5 Jun 2016 20:31:15 -0700 Subject: [PATCH] Incremental work on the pulse audio output. Completely not working. --- src/contrib/pulseout/PulseOut.cpp | 305 +++++++++++++++++++++++++++--- src/contrib/pulseout/PulseOut.h | 29 ++- 2 files changed, 301 insertions(+), 33 deletions(-) diff --git a/src/contrib/pulseout/PulseOut.cpp b/src/contrib/pulseout/PulseOut.cpp index 609b92736..349269676 100644 --- a/src/contrib/pulseout/PulseOut.cpp +++ b/src/contrib/pulseout/PulseOut.cpp @@ -5,13 +5,41 @@ using namespace musik::core::audio; -size_t countBuffersWithProvider( - const std::list& buffers, - const IBufferProvider* provider) -{ +class MainLoopLock { + public: + MainLoopLock(pa_threaded_mainloop* mainLoop) { + this->mainLoop = mainLoop; + pa_threaded_mainloop_lock(mainLoop); + } + + ~MainLoopLock() { + pa_threaded_mainloop_unlock(this->mainLoop); + } + + private: + pa_threaded_mainloop* mainLoop; +}; + +static bool waitForCompletion(pa_operation *op, pa_threaded_mainloop *loop) { + if (op) { + pa_operation_state_t state; + while ((state == pa_operation_get_state(op)) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait(loop); + } + + pa_operation_unref(op); + return (state == PA_OPERATION_DONE); + } + + return false; +} + +size_t PulseOut::CountBuffersWithProvider(IBufferProvider* provider) { + boost::recursive_mutex::scoped_lock bufferLock(this->mutex); + size_t count = 0; - auto it = buffers.begin(); - while (it != buffers.end()) { + auto it = this->buffers.begin(); + while (it != this->buffers.end()) { if ((*it)->provider == provider) { ++count; } @@ -20,20 +48,28 @@ size_t countBuffersWithProvider( return count; } -void PulseOut::NotifyBufferCompleted(BufferContext *context) { - boost::recursive_mutex::scoped_lock lock(this->mutex); +bool PulseOut::RemoveBufferFromQueue(BufferContext* context) { + boost::recursive_mutex::scoped_lock bufferLock(this->mutex); auto it = this->buffers.begin(); while (it != this->buffers.end()) { - if (*it == context) { + if ((*it).get() == context) { this->buffers.erase(it); + return true; } ++it; } - context->provider->OnBufferProcessed(context->buffer); + return false; +} - delete context; +void PulseOut::NotifyBufferCompleted(BufferContext* context) { + IBufferProvider* provider = context->provider; + IBuffer* buffer = context->buffer; + + if (this->RemoveBufferFromQueue(context)) { + provider->OnBufferProcessed(buffer); + } } PulseOut::PulseOut() { @@ -41,11 +77,70 @@ PulseOut::PulseOut() { this->pulseMainLoop = 0;; this->pulseContext = 0; this->pulseStream = 0; + this->pulseStreamFormat.format = PA_SAMPLE_FLOAT32LE; + this->pulseStreamFormat.rate = 0; + this->pulseStreamFormat.channels = 0; + + boost::thread th(boost::bind(&PulseOut::ThreadProc,this)); + th.detach(); + this->InitPulse(); } bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) { - return false; + if (!this->pulseStream || + this->pulseStreamFormat.rate != buffer->SampleRate() || + this->pulseStreamFormat.channels != buffer->Channels()) + { + if (this->pulseStream) { + std::cerr << "fixme: stream switched formats; not handled\n"; + return false; + } + + this->DeinitPulseStream(); + if (!this->InitPulseStream(buffer->SampleRate(), buffer->Channels())) { + std::cerr << "could not initialize stream for playback\n"; + return false; + } + } + + if (this->CountBuffersWithProvider(provider) >= BUFFER_COUNT) { + std::cerr << "full!\n"; + return false; + } + + std::shared_ptr context(new BufferContext()); + context->output = this; + context->buffer = buffer; + context->provider = provider; + + { + boost::recursive_mutex::scoped_lock bufferLock(this->mutex); + this->buffers.push_back(context); + } + + MainLoopLock loopLock(this->pulseMainLoop); + + //std::cerr << buffer->Bytes() << std::endl; + + int error = + pa_stream_write_ext_free( + this->pulseStream, + static_cast(buffer->BufferPointer()), + buffer->Bytes(), + &PulseOut::OnPulseBufferPlayed, + static_cast(context.get()), + 0, + PA_SEEK_RELATIVE); + + if (error) { + std::cerr << "FAILED!! " << error << std::endl; + this->NotifyBufferCompleted(context.get()); + } + + // std::cerr << "wrote " << (error ? "unsuccessfully" : "successfully") << std::endl; + + return !error; } PulseOut::~PulseOut() { @@ -53,14 +148,42 @@ PulseOut::~PulseOut() { this->DeinitPulse(); } +void PulseOut::ThreadProc() { + while (true) { + pa_usec_t usec; + if (this->pulseStream) { + pa_stream_get_time(this->pulseStream, &usec); + std::cerr << "time: " << usec << std::endl; + } + usleep(1000 * 1000); + } +} + void PulseOut::Destroy() { delete this; } void PulseOut::Pause() { + this->SetPaused(true); } void PulseOut::Resume() { + this->SetPaused(false); +} + +void PulseOut::SetPaused(bool paused) { + if (this->pulseStream) { + std::cerr << "resuming... "; + MainLoopLock loopLock(this->pulseMainLoop); + waitForCompletion( + pa_stream_cork( + this->pulseStream, + paused ? 1 : 0, + &PulseOut::OnPulseStreamSuccessCallback, + this), + this->pulseMainLoop); + std::cerr << "resumed"; + } } void PulseOut::SetVolume(double volume) { @@ -69,11 +192,16 @@ void PulseOut::SetVolume(double volume) { void PulseOut::Stop() { } -void PulseOut::OnPulseContextStateChanged(pa_context *c, void *data) { - PulseOut* out = static_cast(data); - const pa_context_state_t state = pa_context_get_state(c); +void PulseOut::OnPulseBufferPlayed(void *data) { + BufferContext* context = static_cast(data); + context->output->NotifyBufferCompleted(context); +} - std::cerr << PA_CONTEXT_READY << std::endl; +void PulseOut::OnPulseContextStateChanged(pa_context *context, void *data) { + PulseOut* out = static_cast(data); + const pa_context_state_t state = pa_context_get_state(context); + + std::cerr << "context connection state changed: " << state << std::endl; switch (state) { case PA_CONTEXT_READY: @@ -85,7 +213,35 @@ void PulseOut::OnPulseContextStateChanged(pa_context *c, void *data) { } } +void PulseOut::OnPulseStreamStateChanged(pa_stream *stream, void *data) { + PulseOut* out = static_cast(data); + const pa_stream_state_t state = pa_stream_get_state(stream); + + std::cerr << "stream connection state changed: " << state << std::endl; + + switch (state) { + case PA_STREAM_READY: + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + pa_threaded_mainloop_signal(out->pulseMainLoop, 0); + default: + return; + } +} + +void PulseOut::OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data) { + PulseOut* out = static_cast(data); + pa_threaded_mainloop_signal(out->pulseMainLoop, 0); +} + + void PulseOut::InitPulse() { + if (!this->InitPulseEventLoopAndContext()) { + this->DeinitPulse(); + } +} + +bool PulseOut::InitPulseEventLoopAndContext() { std::cerr << "init...\n"; this->pulseMainLoop = pa_threaded_mainloop_new(); if (this->pulseMainLoop) { @@ -94,14 +250,17 @@ void PulseOut::InitPulse() { int error = pa_threaded_mainloop_start(this->pulseMainLoop); if (error) { pa_threaded_mainloop_free(this->pulseMainLoop); - this->pulseMainLoop = 0; - return; + this->pulseMainLoop = NULL; + return false; } std::cerr << "started ok.\n"; } pa_mainloop_api* api = pa_threaded_mainloop_get_api(this->pulseMainLoop); + + MainLoopLock loopLock(this->pulseMainLoop); + this->pulseContext = pa_context_new(api, "musikcube"); if (this->pulseContext) { @@ -112,20 +271,116 @@ void PulseOut::InitPulse() { &PulseOut::OnPulseContextStateChanged, this); - pa_context_connect( - this->pulseContext, - NULL, - PA_CONTEXT_NOFAIL, - NULL); + int error = + pa_context_connect( + this->pulseContext, + NULL, + PA_CONTEXT_NOFAIL, + NULL); + + bool connected = false; + while (!error && !connected) { + pa_context_state_t state = + pa_context_get_state(this->pulseContext); + + if (state == PA_CONTEXT_READY) { + std::cerr << "connected!\n"; + connected = true; + } + else if (!PA_CONTEXT_IS_GOOD(state)) { + std::cerr << "corrupted state! bailing.\n"; + error = true; + } + else { + std::cerr << "waiting for connection...\n"; + pa_threaded_mainloop_wait(this->pulseMainLoop); + } + } + + if (connected && !error) { + return true; + } + } + + return false; +} + +bool PulseOut::InitPulseStream(size_t rate, size_t channels) { + MainLoopLock loopLock(this->pulseMainLoop); + + this->pulseStreamFormat.rate = rate; + this->pulseStreamFormat.channels = channels; + + this->pulseStream = pa_stream_new( + this->pulseContext, + "musikcube PulseOut stream", + &this->pulseStreamFormat, + NULL); /* channel mapping */ + + std::cerr << "creating stream...\n"; + + if (this->pulseStream) { + std::cerr << "stream created.\n"; + + pa_stream_set_state_callback( + this->pulseStream, + &PulseOut::OnPulseStreamStateChanged, + this); + + std::cerr << "connecting the stream for playing...\n"; + + int error = pa_stream_connect_playback( + this->pulseStream, + NULL, /* device id */ + NULL, /* buffering attributes */ + PA_STREAM_NOFLAGS, /* additional flags */ + NULL, /* initial volume. docs suggest NULL. */ + NULL); /* stream to synchronize with. */ + + if (!error) { + std::cerr << "connected. waiting for the stream to become ready\n"; + + pa_threaded_mainloop_wait(this->pulseMainLoop); + bool ready = pa_stream_get_state(this->pulseStream) == PA_STREAM_READY; + + std::cerr << (ready ? "stream is ready!" : "stream failed") << std::endl; + + if (ready) { + this->Resume(); + } + + return ready; + } + } + + return false; +} + +void PulseOut::DeinitPulseStream() { + if (this->pulseStream) { + std::cerr << "freeing stream...\n"; + MainLoopLock loopLock(this->pulseMainLoop); + pa_stream_disconnect(this->pulseStream); + pa_stream_unref(this->pulseStream); + this->pulseStream = NULL; } } void PulseOut::DeinitPulse() { + this->DeinitPulseStream(); + + if (this->pulseContext) { + std::cerr << "freeing context...\n"; + MainLoopLock loopLock(this->pulseMainLoop); + pa_context_disconnect(this->pulseContext); + pa_context_unref(this->pulseContext); + this->pulseContext = NULL; + } + if (this->pulseMainLoop) { std::cerr << "stopping...\n"; pa_threaded_mainloop_stop(this->pulseMainLoop); pa_threaded_mainloop_free(this->pulseMainLoop); - this->pulseMainLoop = 0; - std::cerr << "stopped\n"; + this->pulseMainLoop = NULL; } } diff --git a/src/contrib/pulseout/PulseOut.h b/src/contrib/pulseout/PulseOut.h index 2e4bb1b48..6caca5eaf 100644 --- a/src/contrib/pulseout/PulseOut.h +++ b/src/contrib/pulseout/PulseOut.h @@ -10,11 +10,6 @@ class PulseOut : public musik::core::audio::IOutput { public: - struct BufferContext { - musik::core::audio::IBuffer *buffer; - musik::core::audio::IBufferProvider *provider; - }; - PulseOut(); virtual ~PulseOut(); @@ -28,19 +23,37 @@ class PulseOut : public musik::core::audio::IOutput { musik::core::audio::IBuffer *buffer, musik::core::audio::IBufferProvider *provider); - void NotifyBufferCompleted(BufferContext *context); - private: + struct BufferContext { + PulseOut *output; + musik::core::audio::IBuffer *buffer; + musik::core::audio::IBufferProvider *provider; + }; + static void OnPulseContextStateChanged(pa_context *c, void *data); + static void OnPulseStreamStateChanged(pa_stream *s, void *data); + static void OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data); + static void OnPulseBufferPlayed(void *data); + + void ThreadProc(); + + void NotifyBufferCompleted(BufferContext *context); + bool RemoveBufferFromQueue(BufferContext* context); + size_t CountBuffersWithProvider(musik::core::audio::IBufferProvider *provider); void InitPulse(); + bool InitPulseEventLoopAndContext(); + bool InitPulseStream(size_t rate, size_t channels); + void DeinitPulseStream(); void DeinitPulse(); + void SetPaused(bool paused); double volume; - std::list buffers; + std::list > buffers; boost::thread thread; boost::recursive_mutex mutex; pa_threaded_mainloop* pulseMainLoop; pa_context* pulseContext; pa_stream* pulseStream; + pa_sample_spec pulseStreamFormat; };