Prevent queue from growing to large, eating up all memory

This commit is contained in:
loki 2020-04-24 22:10:08 +02:00
parent 1102ac9f3b
commit 1862662b3a
8 changed files with 45 additions and 46 deletions

View File

@ -53,15 +53,15 @@ static opus_stream_config_t HighSurround51 = {
void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t config, void *channel_data) {
//FIXME: Pick correct opus_stream_config_t based on config.channels
auto stream = &stereo;
opus_t opus { opus_multistream_encoder_create(
stream->sampleRate,
stream->channelCount,
stream->streams,
stream->coupledStreams,
stream->mapping,
OPUS_APPLICATION_AUDIO,
nullptr)
};
opus_t opus { opus_multistream_encoder_create(
stream->sampleRate,
stream->channelCount,
stream->streams,
stream->coupledStreams,
stream->mapping,
OPUS_APPLICATION_AUDIO,
nullptr)
};
auto frame_size = config.packetDuration * stream->sampleRate / 1000;
while(auto sample = samples->pop()) {
@ -76,17 +76,19 @@ void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t confi
}
packet.fake_resize(bytes);
packets->raise(std::make_pair(channel_data, std::move(packet)));
packets->raise(channel_data, std::move(packet));
}
}
void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t config, void *channel_data) {
auto samples = std::make_shared<sample_queue_t::element_type>();
auto samples = std::make_shared<sample_queue_t::element_type>(30);
std::thread thread { encodeThread, packets, samples, config, channel_data };
auto fg = util::fail_guard([&]() {
samples->stop();
thread.join();
shutdown_event->view();
});
//FIXME: Pick correct opus_stream_config_t based on config.channels

View File

@ -767,7 +767,7 @@ void start(std::shared_ptr<safe::signal_t> shutdown_event) {
}
}
auto add_cert = std::make_shared<safe::queue_t<crypto::x509_t>>();
auto add_cert = std::make_shared<safe::queue_t<crypto::x509_t>>(30);
// Ugly hack for verifying certificates, see crypto::cert_chain_t::verify() for details
ctx->set_verify_callback([&cert_chain, add_cert](int verified, boost::asio::ssl::verify_context &ctx) {

View File

@ -361,13 +361,10 @@ std::unique_ptr<mic_t> microphone(std::uint32_t sample_rate) {
int status;
const char *audio_sink = nullptr;
const char *audio_sink = "@DEFAULT_MONITOR@";
if(!config::audio.sink.empty()) {
audio_sink = config::audio.sink.c_str();
}
else {
audio_sink = "@DEFAULT_MONITOR@";
}
mic->mic.reset(
pa_simple_new(nullptr, "sunshine", pa_stream_direction_t::PA_STREAM_RECORD, audio_sink, "sunshine-record", &mic->ss, nullptr, nullptr, &status)

View File

@ -319,18 +319,11 @@ public:
}
namespace platf {
class dummy_mic_t : public mic_t {
public:
capture_e sample(std::vector<std::int16_t> &sample_buf) override {
return capture_e::ok;
}
};
std::unique_ptr<mic_t> microphone(std::uint32_t sample_rate) {
auto mic = std::make_unique<audio::mic_wasapi_t>();
if(mic->init(sample_rate)) {
return std::make_unique<dummy_mic_t>();
return nullptr;
}
return mic;

View File

@ -718,9 +718,9 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud
}
int start_broadcast(broadcast_ctx_t &ctx) {
ctx.video_packets = std::make_shared<video::packet_queue_t::element_type>();
ctx.audio_packets = std::make_shared<audio::packet_queue_t::element_type>();
ctx.message_queue_queue = std::make_shared<message_queue_queue_t::element_type>();
ctx.video_packets = std::make_shared<video::packet_queue_t::element_type>(30);
ctx.audio_packets = std::make_shared<audio::packet_queue_t::element_type>(30);
ctx.message_queue_queue = std::make_shared<message_queue_queue_t::element_type>(30);
ctx.video_thread = std::thread { videoBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.video_sock), ctx.video_packets };
ctx.audio_thread = std::thread { audioBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.audio_sock), ctx.audio_packets };
@ -763,7 +763,7 @@ void end_broadcast(broadcast_ctx_t &ctx) {
int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) {
auto constexpr ping = "PING"sv;
auto messages = std::make_shared<message_queue_t::element_type>();
auto messages = std::make_shared<message_queue_t::element_type>(30);
ref->message_queue_queue->raise(type, addr, messages);
auto fg = util::fail_guard([&]() {

View File

@ -26,7 +26,12 @@ public:
return;
}
_status = status_t { std::forward<Args>(args)... };
if constexpr (std::is_same_v<std::optional<T>, status_t>) {
_status = std::make_optional<T>(std::forward<Args>(args)...);
}
else {
_status = status_t { std::forward<Args>(args)... };
}
_cv.notify_all();
}
@ -74,7 +79,7 @@ public:
// pop and view shoud not be used interchangebly
const status_t &view() {
std::unique_lock ul{ _lock };
std::unique_lock ul { _lock };
if (!_continue) {
return util::false_v<status_t>;
@ -130,14 +135,20 @@ class queue_t {
using status_t = util::optional_t<T>;
public:
queue_t(std::uint32_t max_elements) : _max_elements { max_elements } {}
template<class ...Args>
void raise(Args &&... args) {
std::lock_guard lg{_lock};
std::lock_guard ul { _lock };
if(!_continue) {
return;
}
if(_queue.size() == _max_elements) {
_queue.clear();
}
_queue.emplace_back(std::forward<Args>(args)...);
_cv.notify_all();
@ -151,7 +162,7 @@ public:
template<class Rep, class Period>
status_t pop(std::chrono::duration<Rep, Period> delay) {
std::unique_lock ul{_lock};
std::unique_lock ul { _lock };
if (!_continue) {
return util::false_v<status_t>;
@ -170,7 +181,7 @@ public:
}
status_t pop() {
std::unique_lock ul{ _lock };
std::unique_lock ul { _lock };
if (!_continue) {
return util::false_v<status_t>;
@ -191,12 +202,12 @@ public:
}
std::vector<T> &unsafe() {
std::lock_guard { _lock };
std::lock_guard { _lock };
return _queue;
}
void stop() {
std::lock_guard lg{ _lock };
std::lock_guard lg { _lock };
_continue = false;
@ -209,10 +220,12 @@ public:
private:
bool _continue{ true };
bool _continue { true };
std::uint32_t _max_elements;
std::mutex _lock;
std::condition_variable _cv;
std::vector<T> _queue;
};

View File

@ -515,13 +515,7 @@ class buffer_t {
public:
buffer_t() : _els { 0 } {};
buffer_t(buffer_t&&) noexcept = default;
buffer_t &operator=(buffer_t&& other) noexcept {
std::swap(_els, other._els);
_buf = std::move(other._buf);
return *this;
};
buffer_t &operator=(buffer_t&& other) noexcept = default;
explicit buffer_t(size_t elements) : _els { elements }, _buf { std::make_unique<T[]>(elements) } {}
explicit buffer_t(size_t elements, const T &t) : _els { elements }, _buf { std::make_unique<T[]>(elements) } {

View File

@ -238,7 +238,7 @@ struct capture_thread_async_ctx_t {
};
struct capture_thread_sync_ctx_t {
encode_session_ctx_queue_t encode_session_ctx_queue;
encode_session_ctx_queue_t encode_session_ctx_queue { 30 };
};
int start_capture_sync(capture_thread_sync_ctx_t &ctx);
@ -1079,7 +1079,7 @@ bool validate_config(std::shared_ptr<platf::display_t> &disp, const encoder_t &e
session->frame->pict_type = AV_PICTURE_TYPE_I;
auto packets = std::make_shared<packet_queue_t::element_type>();
auto packets = std::make_shared<packet_queue_t::element_type>(30);
if(encode(1, session->ctx, session->frame, packets, nullptr)) {
return false;
}
@ -1282,7 +1282,7 @@ int start_capture_async(capture_thread_async_ctx_t &capture_thread_ctx) {
capture_thread_ctx.encoder_p = &encoders.front();
capture_thread_ctx.reinit_event.reset();
capture_thread_ctx.capture_ctx_queue = std::make_shared<safe::queue_t<capture_ctx_t>>();
capture_thread_ctx.capture_ctx_queue = std::make_shared<safe::queue_t<capture_ctx_t>>(30);
capture_thread_ctx.capture_thread = std::thread {
captureThread,