Incremental work on the pulse audio output. Completely not working.

This commit is contained in:
casey langen 2016-06-05 20:31:15 -07:00
parent 9fbb2898e6
commit 33bd0240b6
2 changed files with 301 additions and 33 deletions

View File

@ -5,13 +5,41 @@
using namespace musik::core::audio;
size_t countBuffersWithProvider(
const std::list<PulseOut::BufferContext*>& buffers,
const IBufferProvider* provider)
{
class MainLoopLock {
public:
MainLoopLock(pa_threaded_mainloop* mainLoop) {
this->mainLoop = mainLoop;
pa_threaded_mainloop_lock(mainLoop);
}
~MainLoopLock() {
pa_threaded_mainloop_unlock(this->mainLoop);
}
private:
pa_threaded_mainloop* mainLoop;
};
static bool waitForCompletion(pa_operation *op, pa_threaded_mainloop *loop) {
if (op) {
pa_operation_state_t state;
while ((state == pa_operation_get_state(op)) == PA_OPERATION_RUNNING) {
pa_threaded_mainloop_wait(loop);
}
pa_operation_unref(op);
return (state == PA_OPERATION_DONE);
}
return false;
}
size_t PulseOut::CountBuffersWithProvider(IBufferProvider* provider) {
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
size_t count = 0;
auto it = buffers.begin();
while (it != buffers.end()) {
auto it = this->buffers.begin();
while (it != this->buffers.end()) {
if ((*it)->provider == provider) {
++count;
}
@ -20,20 +48,28 @@ size_t countBuffersWithProvider(
return count;
}
void PulseOut::NotifyBufferCompleted(BufferContext *context) {
boost::recursive_mutex::scoped_lock lock(this->mutex);
bool PulseOut::RemoveBufferFromQueue(BufferContext* context) {
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
auto it = this->buffers.begin();
while (it != this->buffers.end()) {
if (*it == context) {
if ((*it).get() == context) {
this->buffers.erase(it);
return true;
}
++it;
}
context->provider->OnBufferProcessed(context->buffer);
return false;
}
delete context;
void PulseOut::NotifyBufferCompleted(BufferContext* context) {
IBufferProvider* provider = context->provider;
IBuffer* buffer = context->buffer;
if (this->RemoveBufferFromQueue(context)) {
provider->OnBufferProcessed(buffer);
}
}
PulseOut::PulseOut() {
@ -41,11 +77,70 @@ PulseOut::PulseOut() {
this->pulseMainLoop = 0;;
this->pulseContext = 0;
this->pulseStream = 0;
this->pulseStreamFormat.format = PA_SAMPLE_FLOAT32LE;
this->pulseStreamFormat.rate = 0;
this->pulseStreamFormat.channels = 0;
boost::thread th(boost::bind(&PulseOut::ThreadProc,this));
th.detach();
this->InitPulse();
}
bool PulseOut::Play(IBuffer *buffer, IBufferProvider *provider) {
return false;
if (!this->pulseStream ||
this->pulseStreamFormat.rate != buffer->SampleRate() ||
this->pulseStreamFormat.channels != buffer->Channels())
{
if (this->pulseStream) {
std::cerr << "fixme: stream switched formats; not handled\n";
return false;
}
this->DeinitPulseStream();
if (!this->InitPulseStream(buffer->SampleRate(), buffer->Channels())) {
std::cerr << "could not initialize stream for playback\n";
return false;
}
}
if (this->CountBuffersWithProvider(provider) >= BUFFER_COUNT) {
std::cerr << "full!\n";
return false;
}
std::shared_ptr<BufferContext> context(new BufferContext());
context->output = this;
context->buffer = buffer;
context->provider = provider;
{
boost::recursive_mutex::scoped_lock bufferLock(this->mutex);
this->buffers.push_back(context);
}
MainLoopLock loopLock(this->pulseMainLoop);
//std::cerr << buffer->Bytes() << std::endl;
int error =
pa_stream_write_ext_free(
this->pulseStream,
static_cast<void*>(buffer->BufferPointer()),
buffer->Bytes(),
&PulseOut::OnPulseBufferPlayed,
static_cast<void*>(context.get()),
0,
PA_SEEK_RELATIVE);
if (error) {
std::cerr << "FAILED!! " << error << std::endl;
this->NotifyBufferCompleted(context.get());
}
// std::cerr << "wrote " << (error ? "unsuccessfully" : "successfully") << std::endl;
return !error;
}
PulseOut::~PulseOut() {
@ -53,14 +148,42 @@ PulseOut::~PulseOut() {
this->DeinitPulse();
}
void PulseOut::ThreadProc() {
while (true) {
pa_usec_t usec;
if (this->pulseStream) {
pa_stream_get_time(this->pulseStream, &usec);
std::cerr << "time: " << usec << std::endl;
}
usleep(1000 * 1000);
}
}
void PulseOut::Destroy() {
delete this;
}
void PulseOut::Pause() {
this->SetPaused(true);
}
void PulseOut::Resume() {
this->SetPaused(false);
}
void PulseOut::SetPaused(bool paused) {
if (this->pulseStream) {
std::cerr << "resuming... ";
MainLoopLock loopLock(this->pulseMainLoop);
waitForCompletion(
pa_stream_cork(
this->pulseStream,
paused ? 1 : 0,
&PulseOut::OnPulseStreamSuccessCallback,
this),
this->pulseMainLoop);
std::cerr << "resumed";
}
}
void PulseOut::SetVolume(double volume) {
@ -69,11 +192,16 @@ void PulseOut::SetVolume(double volume) {
void PulseOut::Stop() {
}
void PulseOut::OnPulseContextStateChanged(pa_context *c, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
const pa_context_state_t state = pa_context_get_state(c);
void PulseOut::OnPulseBufferPlayed(void *data) {
BufferContext* context = static_cast<BufferContext*>(data);
context->output->NotifyBufferCompleted(context);
}
std::cerr << PA_CONTEXT_READY << std::endl;
void PulseOut::OnPulseContextStateChanged(pa_context *context, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
const pa_context_state_t state = pa_context_get_state(context);
std::cerr << "context connection state changed: " << state << std::endl;
switch (state) {
case PA_CONTEXT_READY:
@ -85,7 +213,35 @@ void PulseOut::OnPulseContextStateChanged(pa_context *c, void *data) {
}
}
void PulseOut::OnPulseStreamStateChanged(pa_stream *stream, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
const pa_stream_state_t state = pa_stream_get_state(stream);
std::cerr << "stream connection state changed: " << state << std::endl;
switch (state) {
case PA_STREAM_READY:
case PA_STREAM_FAILED:
case PA_STREAM_TERMINATED:
pa_threaded_mainloop_signal(out->pulseMainLoop, 0);
default:
return;
}
}
void PulseOut::OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data) {
PulseOut* out = static_cast<PulseOut*>(data);
pa_threaded_mainloop_signal(out->pulseMainLoop, 0);
}
void PulseOut::InitPulse() {
if (!this->InitPulseEventLoopAndContext()) {
this->DeinitPulse();
}
}
bool PulseOut::InitPulseEventLoopAndContext() {
std::cerr << "init...\n";
this->pulseMainLoop = pa_threaded_mainloop_new();
if (this->pulseMainLoop) {
@ -94,14 +250,17 @@ void PulseOut::InitPulse() {
int error = pa_threaded_mainloop_start(this->pulseMainLoop);
if (error) {
pa_threaded_mainloop_free(this->pulseMainLoop);
this->pulseMainLoop = 0;
return;
this->pulseMainLoop = NULL;
return false;
}
std::cerr << "started ok.\n";
}
pa_mainloop_api* api = pa_threaded_mainloop_get_api(this->pulseMainLoop);
MainLoopLock loopLock(this->pulseMainLoop);
this->pulseContext = pa_context_new(api, "musikcube");
if (this->pulseContext) {
@ -112,20 +271,116 @@ void PulseOut::InitPulse() {
&PulseOut::OnPulseContextStateChanged,
this);
pa_context_connect(
this->pulseContext,
NULL,
PA_CONTEXT_NOFAIL,
NULL);
int error =
pa_context_connect(
this->pulseContext,
NULL,
PA_CONTEXT_NOFAIL,
NULL);
bool connected = false;
while (!error && !connected) {
pa_context_state_t state =
pa_context_get_state(this->pulseContext);
if (state == PA_CONTEXT_READY) {
std::cerr << "connected!\n";
connected = true;
}
else if (!PA_CONTEXT_IS_GOOD(state)) {
std::cerr << "corrupted state! bailing.\n";
error = true;
}
else {
std::cerr << "waiting for connection...\n";
pa_threaded_mainloop_wait(this->pulseMainLoop);
}
}
if (connected && !error) {
return true;
}
}
return false;
}
bool PulseOut::InitPulseStream(size_t rate, size_t channels) {
MainLoopLock loopLock(this->pulseMainLoop);
this->pulseStreamFormat.rate = rate;
this->pulseStreamFormat.channels = channels;
this->pulseStream = pa_stream_new(
this->pulseContext,
"musikcube PulseOut stream",
&this->pulseStreamFormat,
NULL); /* channel mapping */
std::cerr << "creating stream...\n";
if (this->pulseStream) {
std::cerr << "stream created.\n";
pa_stream_set_state_callback(
this->pulseStream,
&PulseOut::OnPulseStreamStateChanged,
this);
std::cerr << "connecting the stream for playing...\n";
int error = pa_stream_connect_playback(
this->pulseStream,
NULL, /* device id */
NULL, /* buffering attributes */
PA_STREAM_NOFLAGS, /* additional flags */
NULL, /* initial volume. docs suggest NULL. */
NULL); /* stream to synchronize with. */
if (!error) {
std::cerr << "connected. waiting for the stream to become ready\n";
pa_threaded_mainloop_wait(this->pulseMainLoop);
bool ready = pa_stream_get_state(this->pulseStream) == PA_STREAM_READY;
std::cerr << (ready ? "stream is ready!" : "stream failed") << std::endl;
if (ready) {
this->Resume();
}
return ready;
}
}
return false;
}
void PulseOut::DeinitPulseStream() {
if (this->pulseStream) {
std::cerr << "freeing stream...\n";
MainLoopLock loopLock(this->pulseMainLoop);
pa_stream_disconnect(this->pulseStream);
pa_stream_unref(this->pulseStream);
this->pulseStream = NULL;
}
}
void PulseOut::DeinitPulse() {
this->DeinitPulseStream();
if (this->pulseContext) {
std::cerr << "freeing context...\n";
MainLoopLock loopLock(this->pulseMainLoop);
pa_context_disconnect(this->pulseContext);
pa_context_unref(this->pulseContext);
this->pulseContext = NULL;
}
if (this->pulseMainLoop) {
std::cerr << "stopping...\n";
pa_threaded_mainloop_stop(this->pulseMainLoop);
pa_threaded_mainloop_free(this->pulseMainLoop);
this->pulseMainLoop = 0;
std::cerr << "stopped\n";
this->pulseMainLoop = NULL;
}
}

View File

@ -10,11 +10,6 @@
class PulseOut : public musik::core::audio::IOutput {
public:
struct BufferContext {
musik::core::audio::IBuffer *buffer;
musik::core::audio::IBufferProvider *provider;
};
PulseOut();
virtual ~PulseOut();
@ -28,19 +23,37 @@ class PulseOut : public musik::core::audio::IOutput {
musik::core::audio::IBuffer *buffer,
musik::core::audio::IBufferProvider *provider);
void NotifyBufferCompleted(BufferContext *context);
private:
struct BufferContext {
PulseOut *output;
musik::core::audio::IBuffer *buffer;
musik::core::audio::IBufferProvider *provider;
};
static void OnPulseContextStateChanged(pa_context *c, void *data);
static void OnPulseStreamStateChanged(pa_stream *s, void *data);
static void OnPulseStreamSuccessCallback(pa_stream *s, int success, void *data);
static void OnPulseBufferPlayed(void *data);
void ThreadProc();
void NotifyBufferCompleted(BufferContext *context);
bool RemoveBufferFromQueue(BufferContext* context);
size_t CountBuffersWithProvider(musik::core::audio::IBufferProvider *provider);
void InitPulse();
bool InitPulseEventLoopAndContext();
bool InitPulseStream(size_t rate, size_t channels);
void DeinitPulseStream();
void DeinitPulse();
void SetPaused(bool paused);
double volume;
std::list<BufferContext*> buffers;
std::list<std::shared_ptr<BufferContext> > buffers;
boost::thread thread;
boost::recursive_mutex mutex;
pa_threaded_mainloop* pulseMainLoop;
pa_context* pulseContext;
pa_stream* pulseStream;
pa_sample_spec pulseStreamFormat;
};