Fix video/audio thread hanging when Moonlight doesn't ping Sunshine

This commit is contained in:
loki 2019-12-17 21:18:59 +01:00
parent b317258a74
commit 00b2063824

View File

@ -11,6 +11,7 @@
#include <queue>
#include <iostream>
#include <future>
#include <boost/asio.hpp>
#include <moonlight-common-c/enet/include/enet/enet.h>
#include <fstream>
@ -559,7 +560,44 @@ void controlThread(video::idr_event_t idr_events) {
}
}
std::optional<udp::endpoint> recv_peer(udp::socket &sock) {
template<class Stream, class Peer, class BufferSequence>
util::Either<std::size_t, sys::error_code> asio_read(Stream &s, asio::io_service &io, const BufferSequence &bufs, Peer &peer, const asio::deadline_timer::duration_type& expire_time) {
std::optional<sys::error_code> timer_result, read_result;
asio::deadline_timer timer { io };
timer.expires_from_now(boost::posix_time::milliseconds(config::stream.ping_timeout.count()));
timer.async_wait([&](sys::error_code c){
timer_result = c;
});
std::size_t len = 0;
s.async_receive_from(bufs, peer, 0, [&](const boost::system::error_code &ec, size_t bytes) {
len = bytes;
read_result = ec;
});
io.reset();
while(io.run_one()) {
if(read_result) {
timer.cancel();
}
else if(timer_result) {
s.cancel();
}
}
if(*read_result) {
return *read_result;
}
return len;
}
template<class T>
std::optional<udp::endpoint> recv_peer(std::shared_ptr<safe::queue_t<T>> &queue, udp::socket &sock, asio::io_service &io) {
std::array<char, 2048> buf;
char ping[] = {
@ -567,31 +605,25 @@ std::optional<udp::endpoint> recv_peer(udp::socket &sock) {
};
udp::endpoint peer;
while (session.video_packets->running()) {
asio::deadline_timer timer { EXECUTOR((&sock)) };
timer.expires_from_now(boost::posix_time::seconds(2));
timer.async_wait([&](sys::error_code c){
sock.cancel();
});
sys::error_code ping_error;
auto len = sock.receive_from(asio::buffer(buf), peer, 0, ping_error);
if(ping_error == sys::errc::make_error_code(sys::errc::operation_canceled)) {
return {};
while (queue->running()) {
auto len_or_err = asio_read(sock, io, asio::buffer(buf), peer, boost::posix_time::milliseconds(config::stream.ping_timeout.count()));
if(len_or_err.has_right() || len_or_err.left() == 0) {
return std::nullopt;
}
timer.cancel();
auto len = len_or_err.left();
if (len == 4 && !std::memcmp(ping, buf.data(), sizeof(ping))) {
std::cout << "PING from ["sv << peer.address().to_string() << ':' << peer.port() << ']' << std::endl;
return std::make_optional(std::move(peer));;
return std::make_optional(std::move(peer));
}
std::cout << "Unknown transmission: "sv << util::hex_vec(std::string_view{buf.data(), len}) << std::endl;
}
return {};
return std::nullopt;
}
void audioThread() {
@ -600,7 +632,7 @@ void audioThread() {
asio::io_service io;
udp::socket sock{io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT)};
auto peer = recv_peer(sock);
auto peer = recv_peer(session.audio_packets, sock, io);
if(!peer) {
return;
}
@ -621,7 +653,9 @@ void audioThread() {
// std::cout << "Audio ["sv << frame << "] :: send..."sv << std::endl;
}
std::cout << "Audio: Joining()" << std::endl;
captureThread.join();
std::cout << "Audio: Joining()" << std::endl;
}
void videoThread(video::idr_event_t idr_events) {
@ -632,7 +666,7 @@ void videoThread(video::idr_event_t idr_events) {
asio::io_service io;
udp::socket sock{io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT)};
auto peer = recv_peer(sock);
auto peer = recv_peer(session.video_packets, sock, io);
if(!peer) {
return;
}
@ -726,8 +760,9 @@ void videoThread(video::idr_event_t idr_events) {
lowseq += shards.size();
}
std::cout << "Video: Joining()" << std::endl;
captureThread.join();
std::cout << "Video: Joined()" << std::endl;
}
void respond(host_t &host, peer_t peer, msg_t &resp) {
@ -985,13 +1020,17 @@ void rtpThread() {
server.map("PLAY"sv, &cmd_play);
while(true) {
server.iterate(1s);
server.iterate(config::stream.ping_timeout);
if(session.video_packets && !session.video_packets->running()) {
std::cout << "Waiting for Audio to end..."sv << std::endl;
session.audioThread.join();
std::cout << "Waiting for Video to end..."sv << std::endl;
session.videoThread.join();
std::cout << "Waiting for Control to end..."sv << std::endl;
session.controlThread.join();
std::cout << "Resetting Session..."sv << std::endl;
session.video_packets = video::packet_queue_t();
session.audio_packets = audio::packet_queue_t();
}