diff --git a/CMakeLists.txt b/CMakeLists.txt index d6248b562..d638e8adb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -155,10 +155,8 @@ add_dependencies(musikcubed musikcube) if (CMAKE_SYSTEM_NAME MATCHES "Linux") add_subdirectory(src/plugins/alsaout) add_subdirectory(src/plugins/pulseout) - if (${ENABLE_PIPEWIRE} MATCHES "true") - add_subdirectory(src/plugins/pipewireout) - add_dependencies(musikcube pipewireout) - endif() + add_subdirectory(src/plugins/pipewireout) + add_dependencies(musikcube pipewireout) if (${ENABLE_MPRIS} MATCHES "true") add_subdirectory(src/plugins/mpris) add_dependencies(musikcube mpris) diff --git a/src/plugins/mpris/mpris.h b/src/plugins/mpris/mpris.h index e10752b9e..92d145c75 100644 --- a/src/plugins/mpris/mpris.h +++ b/src/plugins/mpris/mpris.h @@ -5,6 +5,7 @@ #include #include #include +#include extern "C" { #include diff --git a/src/plugins/pipewireout/CMakeLists.txt b/src/plugins/pipewireout/CMakeLists.txt index fcd67615c..7a4fb8ee3 100644 --- a/src/plugins/pipewireout/CMakeLists.txt +++ b/src/plugins/pipewireout/CMakeLists.txt @@ -5,5 +5,8 @@ set (pipewireout_SOURCES message(STATUS "[pipewireout] plugin enabled") +include_directories("/usr/include/spa-0.2") +include_directories("/usr/include/pipewire-0.3") + add_library(pipewireout SHARED ${pipewireout_SOURCES}) target_link_libraries(pipewireout ${musikcube_LINK_LIBS} pipewire-0.3) diff --git a/src/plugins/pipewireout/PipeWireOut.cpp b/src/plugins/pipewireout/PipeWireOut.cpp index 0418fd878..c378a9323 100644 --- a/src/plugins/pipewireout/PipeWireOut.cpp +++ b/src/plugins/pipewireout/PipeWireOut.cpp @@ -42,6 +42,8 @@ #include #include +constexpr size_t MAX_BUFFERS = 16; + static IPreferences* prefs = nullptr; extern "C" void SetPreferences(IPreferences* prefs) { @@ -53,26 +55,70 @@ extern "C" musik::core::sdk::ISchema* GetSchema() { return schema; } -static void onStreamStageChanged( - void *data, - pw_stream_state old, - pw_stream_state state, - const char *error) -{ +void PipeWireOut::OnStreamStateChanged(void* data, enum pw_stream_state old, enum pw_stream_state state, const char* error) { std::cerr << "[PipeWire] state changed from " << old << " to " << state << "\n"; } -static void onStreamProcess(void *userdata) { - std::cerr << "clclcl onProcess\n"; +void PipeWireOut::OnStreamProcess(void* data) { + PipeWireOut* output = static_cast(data); + + struct pw_buffer* pwBuffer; + + if ((pwBuffer = pw_stream_dequeue_buffer(output->pwStream)) == nullptr) { + std::cerr << "[PipeWire] no more output buffers available to fill\n"; + return; + } + + BufferContext* outputBufferContext = nullptr; + + { + std::unique_lock lock(output->mutex); + if (output->buffers.empty()) { + std::cerr << "[PipeWire] no more input buffers available\n"; + return; + } + outputBufferContext = output->buffers.front(); + output->buffers.pop_front(); + } + + struct spa_buffer* spaBuffer = pwBuffer->buffer; + auto& outBufferData = spaBuffer->datas[0]; + void* outBufferPtr = outBufferData.data; + uint32_t outBufferSize = outBufferData.maxsize; + void* inBufferPtr = outputBufferContext->buffer->BufferPointer() ; + uint32_t inBufferSize = (uint32_t) outputBufferContext->buffer->Bytes(); + uint32_t frameSize = sizeof(float) * outputBufferContext->buffer->Channels(); + + std::cerr << "[PipeWire] onProcess " << inBufferSize << " vs " << outBufferSize << "\n"; + + /* TODO: assumes dest size >= src size. need to do additional book keeping in + BufferContext to store offset if target isn't large enough, then push the + struct back */ + + if (inBufferSize > outBufferSize) { + std::cerr << "[PipeWire] onProcess output buffer too small\n"; + exit(0); + } + + memcpy(outBufferPtr, inBufferPtr, inBufferSize); + outBufferData.chunk->offset = 0; + outBufferData.chunk->stride = frameSize; + outBufferData.chunk->size = inBufferSize; + pwBuffer->size = inBufferSize / frameSize; + + outputBufferContext->provider->OnBufferProcessed(outputBufferContext->buffer); + + delete outputBufferContext; + + pw_stream_queue_buffer(output->pwStream, pwBuffer); } -static const struct pw_stream_events streamEvents = { - PW_VERSION_STREAM_EVENTS, - .process = onStreamProcess, - .state_changed = onStreamStageChanged -}; - PipeWireOut::PipeWireOut() { + this->pwStreamEvents = { + PW_VERSION_STREAM_EVENTS, + }; + this->pwStreamEvents.state_changed = PipeWireOut::OnStreamStateChanged; + this->pwStreamEvents.process = OnStreamProcess; } PipeWireOut::~PipeWireOut() { @@ -158,25 +204,32 @@ bool PipeWireOut::StartPipeWire(IBuffer* buffer) { PW_KEY_MEDIA_CATEGORY, "Playback", PW_KEY_MEDIA_ROLE, "Music", NULL), - &streamEvents, + &this->pwStreamEvents, this); if (this->pwStream) { uint8_t intBuffer[1024]; - spa_pod_builder builder = + spa_pod_builder builder = SPA_POD_BUILDER_INIT(intBuffer, sizeof(intBuffer)); const spa_pod *params[1]; - spa_audio_info_raw audioInfo = SPA_AUDIO_INFO_RAW_INIT( - .format = SPA_AUDIO_FORMAT_F32, - .channels = (uint32_t) buffer->Channels(), - .rate = (uint32_t) buffer->SampleRate()); + spa_audio_info_raw audioInfo; + spa_zero(audioInfo); + audioInfo.flags = 0; + audioInfo.format = SPA_AUDIO_FORMAT_F32; + audioInfo.channels = (uint32_t) buffer->Channels(); + audioInfo.rate = (uint32_t) buffer->SampleRate(); params[0] = spa_format_audio_raw_build( &builder, SPA_PARAM_EnumFormat, &audioInfo); + if (!params[0]) { + std::cerr << "[PipeWire] failed to create audio format\n"; + goto cleanup; + } + result = pw_stream_connect( this->pwStream, PW_DIRECTION_OUTPUT, @@ -211,18 +264,18 @@ OutputState PipeWireOut::Play(IBuffer *buffer, IBufferProvider *provider) { } } - // std::cerr << "clclcl here\n"; - - /* TODO: if buffer format changes, drain, then update the params! */ - - if (this->state == State::Paused) { + if (this->state != State::Playing) { return OutputState::InvalidState; } - /* order of operations matters, otherwise overflow. */ - int micros = ((buffer->Samples() * 1000) / buffer->SampleRate() * 1000) / buffer->Channels(); - usleep((long)((float) micros)); - provider->OnBufferProcessed(buffer); + { + std::unique_lock lock(this->mutex); + if (this->buffers.size() >= MAX_BUFFERS) { + return OutputState::BufferFull; + } + this->buffers.push_back(new BufferContext(buffer, provider)); + } + return OutputState::BufferWritten; } diff --git a/src/plugins/pipewireout/PipeWireOut.h b/src/plugins/pipewireout/PipeWireOut.h index b611ccb6e..41b450dae 100644 --- a/src/plugins/pipewireout/PipeWireOut.h +++ b/src/plugins/pipewireout/PipeWireOut.h @@ -39,7 +39,7 @@ #include #include #include -#include +#include using namespace musik::core::sdk; @@ -69,8 +69,19 @@ class PipeWireOut : public IOutput { bool StartPipeWire(IBuffer* buffer); void StopPipeWire(); - struct BufferState { - BufferState(IBuffer* buffer, IBufferProvider* provider) { + static void OnStreamStateChanged( + void* userdata, + enum pw_stream_state old, + enum pw_stream_state state, + const char* error); + + static void OnStreamProcess(void* userdata); + + struct BufferContext { + BufferContext() { + this->buffer = nullptr; this->provider = nullptr; + } + BufferContext(IBuffer* buffer, IBufferProvider* provider) { this->buffer = buffer; this->provider = provider; } IBuffer* buffer; @@ -81,12 +92,12 @@ class PipeWireOut : public IOutput { Stopped, Paused, Playing }; - std::unordered_set buffers; + std::deque buffers; std::recursive_mutex mutex; std::atomic initialized { false }; std::atomic state { State::Stopped }; double volume { 1.0 }; + pw_stream_events pwStreamEvents; pw_thread_loop* pwThreadLoop { nullptr }; pw_stream* pwStream { nullptr }; - };