More scaffholding.

This commit is contained in:
casey langen 2021-03-14 18:52:12 -07:00
parent dee5150c2e
commit beb95e55a5
5 changed files with 104 additions and 38 deletions

View File

@ -155,10 +155,8 @@ add_dependencies(musikcubed musikcube)
if (CMAKE_SYSTEM_NAME MATCHES "Linux")
add_subdirectory(src/plugins/alsaout)
add_subdirectory(src/plugins/pulseout)
if (${ENABLE_PIPEWIRE} MATCHES "true")
add_subdirectory(src/plugins/pipewireout)
add_dependencies(musikcube pipewireout)
endif()
add_subdirectory(src/plugins/pipewireout)
add_dependencies(musikcube pipewireout)
if (${ENABLE_MPRIS} MATCHES "true")
add_subdirectory(src/plugins/mpris)
add_dependencies(musikcube mpris)

View File

@ -5,6 +5,7 @@
#include <musikcore/sdk/IPlugin.h>
#include <mutex>
#include <thread>
#include <memory>
extern "C" {
#include <systemd/sd-bus.h>

View File

@ -5,5 +5,8 @@ set (pipewireout_SOURCES
message(STATUS "[pipewireout] plugin enabled")
include_directories("/usr/include/spa-0.2")
include_directories("/usr/include/pipewire-0.3")
add_library(pipewireout SHARED ${pipewireout_SOURCES})
target_link_libraries(pipewireout ${musikcube_LINK_LIBS} pipewire-0.3)

View File

@ -42,6 +42,8 @@
#include <unistd.h>
#include <iostream>
constexpr size_t MAX_BUFFERS = 16;
static IPreferences* prefs = nullptr;
extern "C" void SetPreferences(IPreferences* prefs) {
@ -53,26 +55,70 @@ extern "C" musik::core::sdk::ISchema* GetSchema() {
return schema;
}
static void onStreamStageChanged(
void *data,
pw_stream_state old,
pw_stream_state state,
const char *error)
{
void PipeWireOut::OnStreamStateChanged(void* data, enum pw_stream_state old, enum pw_stream_state state, const char* error) {
std::cerr << "[PipeWire] state changed from " << old << " to " << state << "\n";
}
static void onStreamProcess(void *userdata) {
std::cerr << "clclcl onProcess\n";
void PipeWireOut::OnStreamProcess(void* data) {
PipeWireOut* output = static_cast<PipeWireOut*>(data);
struct pw_buffer* pwBuffer;
if ((pwBuffer = pw_stream_dequeue_buffer(output->pwStream)) == nullptr) {
std::cerr << "[PipeWire] no more output buffers available to fill\n";
return;
}
BufferContext* outputBufferContext = nullptr;
{
std::unique_lock<std::recursive_mutex> lock(output->mutex);
if (output->buffers.empty()) {
std::cerr << "[PipeWire] no more input buffers available\n";
return;
}
outputBufferContext = output->buffers.front();
output->buffers.pop_front();
}
struct spa_buffer* spaBuffer = pwBuffer->buffer;
auto& outBufferData = spaBuffer->datas[0];
void* outBufferPtr = outBufferData.data;
uint32_t outBufferSize = outBufferData.maxsize;
void* inBufferPtr = outputBufferContext->buffer->BufferPointer() ;
uint32_t inBufferSize = (uint32_t) outputBufferContext->buffer->Bytes();
uint32_t frameSize = sizeof(float) * outputBufferContext->buffer->Channels();
std::cerr << "[PipeWire] onProcess " << inBufferSize << " vs " << outBufferSize << "\n";
/* TODO: assumes dest size >= src size. need to do additional book keeping in
BufferContext to store offset if target isn't large enough, then push the
struct back */
if (inBufferSize > outBufferSize) {
std::cerr << "[PipeWire] onProcess output buffer too small\n";
exit(0);
}
memcpy(outBufferPtr, inBufferPtr, inBufferSize);
outBufferData.chunk->offset = 0;
outBufferData.chunk->stride = frameSize;
outBufferData.chunk->size = inBufferSize;
pwBuffer->size = inBufferSize / frameSize;
outputBufferContext->provider->OnBufferProcessed(outputBufferContext->buffer);
delete outputBufferContext;
pw_stream_queue_buffer(output->pwStream, pwBuffer);
}
static const struct pw_stream_events streamEvents = {
PW_VERSION_STREAM_EVENTS,
.process = onStreamProcess,
.state_changed = onStreamStageChanged
};
PipeWireOut::PipeWireOut() {
this->pwStreamEvents = {
PW_VERSION_STREAM_EVENTS,
};
this->pwStreamEvents.state_changed = PipeWireOut::OnStreamStateChanged;
this->pwStreamEvents.process = OnStreamProcess;
}
PipeWireOut::~PipeWireOut() {
@ -158,25 +204,32 @@ bool PipeWireOut::StartPipeWire(IBuffer* buffer) {
PW_KEY_MEDIA_CATEGORY, "Playback",
PW_KEY_MEDIA_ROLE, "Music",
NULL),
&streamEvents,
&this->pwStreamEvents,
this);
if (this->pwStream) {
uint8_t intBuffer[1024];
spa_pod_builder builder =
spa_pod_builder builder =
SPA_POD_BUILDER_INIT(intBuffer, sizeof(intBuffer));
const spa_pod *params[1];
spa_audio_info_raw audioInfo = SPA_AUDIO_INFO_RAW_INIT(
.format = SPA_AUDIO_FORMAT_F32,
.channels = (uint32_t) buffer->Channels(),
.rate = (uint32_t) buffer->SampleRate());
spa_audio_info_raw audioInfo;
spa_zero(audioInfo);
audioInfo.flags = 0;
audioInfo.format = SPA_AUDIO_FORMAT_F32;
audioInfo.channels = (uint32_t) buffer->Channels();
audioInfo.rate = (uint32_t) buffer->SampleRate();
params[0] = spa_format_audio_raw_build(
&builder, SPA_PARAM_EnumFormat, &audioInfo);
if (!params[0]) {
std::cerr << "[PipeWire] failed to create audio format\n";
goto cleanup;
}
result = pw_stream_connect(
this->pwStream,
PW_DIRECTION_OUTPUT,
@ -211,18 +264,18 @@ OutputState PipeWireOut::Play(IBuffer *buffer, IBufferProvider *provider) {
}
}
// std::cerr << "clclcl here\n";
/* TODO: if buffer format changes, drain, then update the params! */
if (this->state == State::Paused) {
if (this->state != State::Playing) {
return OutputState::InvalidState;
}
/* order of operations matters, otherwise overflow. */
int micros = ((buffer->Samples() * 1000) / buffer->SampleRate() * 1000) / buffer->Channels();
usleep((long)((float) micros));
provider->OnBufferProcessed(buffer);
{
std::unique_lock<std::recursive_mutex> lock(this->mutex);
if (this->buffers.size() >= MAX_BUFFERS) {
return OutputState::BufferFull;
}
this->buffers.push_back(new BufferContext(buffer, provider));
}
return OutputState::BufferWritten;
}

View File

@ -39,7 +39,7 @@
#include <atomic>
#include <thread>
#include <mutex>
#include <unordered_set>
#include <deque>
using namespace musik::core::sdk;
@ -69,8 +69,19 @@ class PipeWireOut : public IOutput {
bool StartPipeWire(IBuffer* buffer);
void StopPipeWire();
struct BufferState {
BufferState(IBuffer* buffer, IBufferProvider* provider) {
static void OnStreamStateChanged(
void* userdata,
enum pw_stream_state old,
enum pw_stream_state state,
const char* error);
static void OnStreamProcess(void* userdata);
struct BufferContext {
BufferContext() {
this->buffer = nullptr; this->provider = nullptr;
}
BufferContext(IBuffer* buffer, IBufferProvider* provider) {
this->buffer = buffer; this->provider = provider;
}
IBuffer* buffer;
@ -81,12 +92,12 @@ class PipeWireOut : public IOutput {
Stopped, Paused, Playing
};
std::unordered_set<BufferState*> buffers;
std::deque<BufferContext*> buffers;
std::recursive_mutex mutex;
std::atomic<bool> initialized { false };
std::atomic<State> state { State::Stopped };
double volume { 1.0 };
pw_stream_events pwStreamEvents;
pw_thread_loop* pwThreadLoop { nullptr };
pw_stream* pwStream { nullptr };
};