Stabilized Pipewire. Just a few TODOs left.

This commit is contained in:
casey langen 2021-03-16 23:44:47 -07:00
parent 4e2a46267c
commit 75dac589d0
2 changed files with 157 additions and 85 deletions

View File

@ -41,6 +41,7 @@
#include <spa/utils/result.h>
#include <unistd.h>
#include <iostream>
#include <chrono>
constexpr size_t SAMPLES_PER_BUFFER = 2048;
constexpr size_t SAMPLE_SIZE_BYTES = sizeof(float);
@ -61,80 +62,63 @@ void PipeWireOut::OnStreamStateChanged(void* data, enum pw_stream_state old, enu
std::cerr << "[PipeWire] state changed from " << old << " to " << state << "\n";
}
void PipeWireOut::OnDrained(void* data) {
std::cerr << "[PipeWire] drained\n";
PipeWireOut* self = static_cast<PipeWireOut*>(data);
self->drainCondition.notify_all();
}
void PipeWireOut::OnStreamProcess(void* data) {
PipeWireOut* self = static_cast<PipeWireOut*>(data);
struct pw_buffer* pwBuffer;
if ((pwBuffer = pw_stream_dequeue_buffer(self->pwStream)) == nullptr) {
std::cerr << "[PipeWire] no more output buffers available to fill\n";
return;
}
struct spa_buffer* spaBuffer = pwBuffer->buffer;
auto& outBufferData = spaBuffer->datas[0];
char* outBufferPtr = (char*) outBufferData.data;
uint32_t outBufferRemaining = outBufferData.maxsize;
int channelCount = 2;
static size_t D_PROCESSED = 0;
while (outBufferRemaining > 0) {
BufferContext* inContext = nullptr;
uint32_t inBufferSize = 0;
uint32_t inBufferRemaining = 0;
uint32_t bytesToCopy = 0;
//std::cerr << "[PipeWire] " << outBufferRemaining << " bytes still need to be filled...\n";
{
std::unique_lock<std::recursive_mutex> lock(self->mutex);
while (self->buffers.empty() && self->state != State::Shutdown) {
std::cerr << "[PipeWire] waiting for data...\n";
self->bufferCondition.wait(lock);
if (self->state == State::Shutdown) {
break;
}
}
if (self->state == State::Shutdown) {
std::cerr << "[PipeWire] shutdown detected, so buffers will not be filled.\n";
if (self->state != State::Playing) {
return;
}
inContext = self->buffers.front();
OutBufferContext& outContext = self->outBufferContext;
channelCount = inContext->buffer->Channels();
inBufferSize = (uint32_t) inContext->buffer->Bytes();
inBufferRemaining = inContext->remaining;
if (!outContext.Valid()) {
outContext.Initialize(pw_stream_dequeue_buffer(self->pwStream));
if (!outContext.Valid()) {
// std::cerr << "[PipeWire] no more output buffers available to fill\n";
return;
}
}
if (outBufferRemaining >= inBufferRemaining) {
uint32_t channelCount;
while (outContext.remaining > 0 && !self->buffers.empty()) {
//std::cerr << "[PipeWire] " << outContext.remaining << " bytes still need to be filled...\n";
InBufferContext* inContext = self->buffers.front();
channelCount = (uint32_t) inContext->buffer->Channels();
uint32_t inBufferSize = (uint32_t) inContext->buffer->Bytes();
uint32_t bytesToCopy = std::min(outContext.remaining, inContext->remaining);
if (outContext.remaining >= inContext->remaining) {
self->buffers.pop_front();
self->bufferCondition.notify_all();
}
bytesToCopy = std::min(outBufferRemaining, inBufferRemaining);
memcpy(outContext.writePtr, inContext->readPtr, bytesToCopy);
inContext->Advance(bytesToCopy); /* will `delete this` if emptied */
outContext.Advance(bytesToCopy);
}
//std::cerr << "[PipeWire] popped buffer #" << D_PROCESSED++ << ". Will fill with " << bytesToCopy << " bytes.\n";
memcpy(outBufferPtr, inContext->readPtr, bytesToCopy);
inContext->Advance(bytesToCopy); /* will auto release if empty */
outBufferRemaining -= bytesToCopy;
outBufferPtr += bytesToCopy;
if (outContext.remaining == 0) {
outContext.Finalize(self->pwStream, SAMPLE_SIZE_BYTES * channelCount);
}
}
outBufferData.chunk->offset = 0;
outBufferData.chunk->stride = SAMPLE_SIZE_BYTES * channelCount;
outBufferData.chunk->size = outBufferData.maxsize;
pw_stream_queue_buffer(self->pwStream, pwBuffer);
}
PipeWireOut::PipeWireOut() {
this->pwStreamEvents = {
PW_VERSION_STREAM_EVENTS,
};
this->pwStreamEvents = { PW_VERSION_STREAM_EVENTS };
this->pwStreamEvents.state_changed = PipeWireOut::OnStreamStateChanged;
this->pwStreamEvents.process = OnStreamProcess;
this->pwStreamEvents.process = PipeWireOut::OnStreamProcess;
this->pwStreamEvents.drained = PipeWireOut::OnDrained;
}
PipeWireOut::~PipeWireOut() {
@ -146,12 +130,10 @@ void PipeWireOut::Release() {
}
void PipeWireOut::Pause() {
this->state = State::Paused;
{
std::unique_lock<std::recursive_mutex> lock(this->mutex);
this->bufferCondition.notify_all();
this->state = State::Paused;
}
{
if (this->pwThreadLoop && this->pwStream) {
pw_thread_loop_lock(this->pwThreadLoop);
@ -162,13 +144,10 @@ void PipeWireOut::Pause() {
}
void PipeWireOut::Resume() {
this->state = State::Playing;
{
std::unique_lock<std::recursive_mutex> lock(this->mutex);
this->bufferCondition.notify_all();
this->state = State::Playing;
}
{
if (this->pwThreadLoop && this->pwStream) {
pw_thread_loop_lock(this->pwThreadLoop);
@ -189,11 +168,37 @@ double PipeWireOut::GetVolume() {
}
void PipeWireOut::Stop() {
std::unique_lock<std::recursive_mutex> lock(this->mutex);
this->DiscardInputBuffers();
this->state = State::Stopped;
if (this->pwThreadLoop && this->pwStream) {
pw_thread_loop_lock(this->pwThreadLoop);
pw_stream_set_active(this->pwStream, false);
pw_stream_flush(this->pwStream, false);
pw_thread_loop_unlock(this->pwThreadLoop);
}
}
void PipeWireOut::DiscardInputBuffers() {
std::unique_lock<std::recursive_mutex> lock(this->mutex);
for (auto& buffer : this->buffers) {
buffer->Discard();
}
this->buffers.clear();
this->bufferCondition.notify_all();
}
void PipeWireOut::Drain() {
/* CAL TODO */
std::unique_lock<std::recursive_mutex> lock(this->mutex);
while (this->buffers.size()) {
bufferCondition.wait(lock);
}
if (this->pwThreadLoop && this->pwStream) {
pw_thread_loop_lock(this->pwThreadLoop);
pw_stream_flush(this->pwStream, true);
pw_thread_loop_unlock(this->pwThreadLoop);
drainCondition.wait_for(lock, std::chrono::milliseconds(10000));
}
}
IDeviceList* PipeWireOut::GetDeviceList() {
@ -214,16 +219,20 @@ IDevice* PipeWireOut::GetDefaultDevice() {
void PipeWireOut::StopPipeWire() {
std::cerr << "[PipeWire] shutdown started...\n";
this->Stop();
{
std::unique_lock<std::recursive_mutex> lock(this->mutex);
this->state = State::Shutdown;
this->bufferCondition.notify_all();
}
if (this->pwThreadLoop) {
pw_thread_loop_stop(this->pwThreadLoop);
if (this->pwStream) {
this->outBufferContext.Finalize(
this->pwStream,
SAMPLE_SIZE_BYTES * this->channelCount);
pw_stream_destroy(this->pwStream);
this->pwStream = nullptr;
}
@ -233,6 +242,8 @@ void PipeWireOut::StopPipeWire() {
}
this->initialized = false;
this->channelCount = 0;
this->sampleRate = 0;
std::cerr << "[PipeWire] shutdown complete.\n";
}
@ -269,11 +280,14 @@ bool PipeWireOut::StartPipeWire(IBuffer* buffer) {
spa_pod_builder builder = SPA_POD_BUILDER_INIT(builderBuffer, sizeof(builderBuffer));
const spa_pod *params[2];
this->channelCount = buffer->Channels();
this->sampleRate = buffer->SampleRate();
spa_audio_info_raw audioInfo;
spa_zero(audioInfo);
audioInfo.format = SPA_AUDIO_FORMAT_F32;
audioInfo.channels = (uint32_t) buffer->Channels();
audioInfo.rate = (uint32_t) buffer->SampleRate();
audioInfo.channels = (uint32_t) this->channelCount;
audioInfo.rate = (uint32_t) this->sampleRate;
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &audioInfo);
@ -327,7 +341,15 @@ OutputState PipeWireOut::Play(IBuffer *buffer, IBufferProvider *provider) {
}
}
/* CAL TODO: re-init stream if buffer format changes */
if (this->channelCount != buffer->Channels() || this->sampleRate != buffer->SampleRate()) {
State lastState = this->state;
this->Drain();
this->StopPipeWire();
if (!this->StartPipeWire(buffer)) {
return OutputState::InvalidState;
}
this->state = lastState;
}
if (this->state != State::Playing) {
return OutputState::InvalidState;
@ -338,8 +360,8 @@ OutputState PipeWireOut::Play(IBuffer *buffer, IBufferProvider *provider) {
if (this->buffers.size() >= MAX_BUFFERS) {
return OutputState::BufferFull;
}
this->buffers.push_back(new BufferContext(buffer, provider));
this->bufferCondition.notify_all();
this->buffers.push_back(new InBufferContext(buffer, provider));
bufferCondition.notify_all();
}
return OutputState::BufferWritten;

View File

@ -39,8 +39,8 @@
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <condition_variable>
using namespace musik::core::sdk;
@ -69,6 +69,7 @@ class PipeWireOut : public IOutput {
private:
bool StartPipeWire(IBuffer* buffer);
void StopPipeWire();
void DiscardInputBuffers();
static void OnStreamStateChanged(
void* userdata,
@ -78,10 +79,10 @@ class PipeWireOut : public IOutput {
static void OnStreamProcess(void* userdata);
struct BufferContext {
BufferContext() {
}
BufferContext(IBuffer* buffer, IBufferProvider* provider) {
static void OnDrained(void* userdata);
struct InBufferContext {
InBufferContext(IBuffer* buffer, IBufferProvider* provider) {
this->buffer = buffer; this->provider = provider;
this->readPtr = (char*) buffer->BufferPointer();
this->remaining = (uint32_t) buffer->Bytes();
@ -95,23 +96,72 @@ class PipeWireOut : public IOutput {
delete this;
}
}
void Discard() {
this->provider->OnBufferProcessed(this->buffer);
delete this;
}
IBuffer* buffer{nullptr};
IBufferProvider* provider{nullptr};
uint32_t remaining{0};
char* readPtr;
};
struct OutBufferContext {
void Initialize(pw_buffer* buffer) {
this->buffer = buffer;
if (buffer) {
struct spa_buffer* spaBuffer = buffer->buffer;
this->writePtr = (char*) spaBuffer->datas[0].data;
this->remaining = spaBuffer->datas[0].maxsize;
this->total = this->remaining;
}
else {
this->Reset();
}
}
void Reset() {
this->buffer = nullptr;
this->writePtr = nullptr;
this->remaining = 0;
this->total = 0;
}
void Advance(int count) {
this->remaining -= count;
this->writePtr += count;
}
void Finalize(pw_stream* stream, uint32_t stride) {
if (this->Valid()) {
spa_data& data = this->buffer->buffer->datas[0];
data.chunk->offset = 0;
data.chunk->stride = stride;
data.chunk->size = this->total - this->remaining;
pw_stream_queue_buffer(stream, this->buffer);
this->Reset();
}
}
bool Valid() {
return this->buffer != nullptr;
}
pw_buffer* buffer{nullptr};
uint32_t remaining{0};
uint32_t total{0};
char* writePtr{nullptr};
};
enum class State {
Stopped, Paused, Playing, Shutdown
};
std::deque<BufferContext*> buffers;
std::deque<InBufferContext*> buffers;
std::recursive_mutex mutex;
std::atomic<bool> initialized{false};
std::atomic<State> state{State::Stopped};
std::condition_variable_any bufferCondition, drainCondition;
double volume{1.0};
pw_stream_events pwStreamEvents;
pw_thread_loop* pwThreadLoop {nullptr};
pw_stream* pwStream {nullptr};
std::condition_variable_any bufferCondition;
pw_stream* pwStream{nullptr};
OutBufferContext outBufferContext;
long channelCount{0};
long sampleRate{0};
};