mirror of
https://github.com/LizardByte/Sunshine.git
synced 2025-02-28 12:40:33 +00:00
Fix lack of audio for second client when multiple clients are connected simultaniously
This commit is contained in:
parent
a93a640d42
commit
8c803e6a34
@ -229,6 +229,14 @@ struct broadcast_ctx_t {
|
||||
|
||||
udp::socket video_sock { io };
|
||||
udp::socket audio_sock { io };
|
||||
|
||||
// This is purely for adminitrative purposes.
|
||||
//
|
||||
// It's possible two instances of Moonlight are behind a NAT.
|
||||
// From Sunshine's point of view, the ip addresses are identical
|
||||
// We need some way to know what ports are already used for different streams
|
||||
util::sync_t<std::vector<std::pair<std::string, std::uint16_t>>> audio_video_connections;
|
||||
|
||||
control_server_t control_server;
|
||||
};
|
||||
|
||||
@ -260,6 +268,11 @@ struct session_t {
|
||||
std::uint32_t avRiKeyId;
|
||||
std::uint32_t timestamp;
|
||||
udp::endpoint peer;
|
||||
|
||||
util::buffer_t<char> shards;
|
||||
util::buffer_t<uint8_t *> shards_p;
|
||||
|
||||
audio_fec_packet_t fec_packet;
|
||||
} audio;
|
||||
|
||||
struct {
|
||||
@ -800,7 +813,7 @@ void recvThread(broadcast_ctx_t &ctx) {
|
||||
|
||||
auto it = peer_to_session.find(peer.address());
|
||||
if(it != std::end(peer_to_session)) {
|
||||
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ":"sv << peer.port() << " :: " << type_str;
|
||||
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
||||
it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes });
|
||||
}
|
||||
};
|
||||
@ -878,14 +891,12 @@ void videoBroadcastThread(udp::socket &sock) {
|
||||
|
||||
auto lastBlockIndex = 0;
|
||||
if(payload.size() > multi_fec_threshold) {
|
||||
BOOST_LOG(debug) << "Generating multiple FEC blocks"sv;
|
||||
BOOST_LOG(verbose) << "Generating multiple FEC blocks"sv;
|
||||
|
||||
// Align individual fec blocks to blocksize
|
||||
auto unaligned_size = payload.size() / MAX_FEC_BLOCKS;
|
||||
auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize;
|
||||
|
||||
BOOST_LOG(fatal) << blocksize << " :: "sv << payload.size() << " :: "sv << aligned_size;
|
||||
|
||||
// Break the data up into 3 blocks, each containing multiple complete video packets.
|
||||
fec_blocks[0] = payload.substr(0, aligned_size);
|
||||
fec_blocks[1] = payload.substr(aligned_size, aligned_size);
|
||||
@ -901,10 +912,6 @@ void videoBroadcastThread(udp::socket &sock) {
|
||||
|
||||
auto blockIndex = 0;
|
||||
std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) {
|
||||
if(lastBlockIndex > 0) {
|
||||
BOOST_LOG(fatal) << current_payload.size();
|
||||
}
|
||||
|
||||
auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets);
|
||||
|
||||
// set FEC info now that we know for sure what our percentage will be for this frame
|
||||
@ -961,15 +968,8 @@ void audioBroadcastThread(udp::socket &sock) {
|
||||
auto packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
|
||||
|
||||
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
|
||||
util::buffer_t<char> shards { RTPA_TOTAL_SHARDS * max_block_size };
|
||||
util::buffer_t<uint8_t *> shards_p { RTPA_TOTAL_SHARDS };
|
||||
|
||||
for(auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) {
|
||||
shards_p[x] = (uint8_t *)&shards[x * max_block_size];
|
||||
}
|
||||
|
||||
audio_packet_t audio_packet { (audio_packet_raw_t *)malloc(sizeof(audio_packet_raw_t) + max_block_size) };
|
||||
audio_fec_packet_t audio_fec_packet { (audio_fec_packet_raw_t *)malloc(sizeof(audio_fec_packet_raw_t) + max_block_size) };
|
||||
fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) };
|
||||
|
||||
// For unknown reasons, the RS parity matrix computed by our RS implementation
|
||||
@ -985,14 +985,6 @@ void audioBroadcastThread(udp::socket &sock) {
|
||||
audio_packet->rtp.packetType = 97;
|
||||
audio_packet->rtp.ssrc = 0;
|
||||
|
||||
audio_fec_packet->rtp.header = 0x80;
|
||||
audio_fec_packet->rtp.packetType = 127;
|
||||
audio_fec_packet->rtp.timestamp = 0;
|
||||
audio_fec_packet->rtp.ssrc = 0;
|
||||
|
||||
audio_fec_packet->fecHeader.payloadType = audio_packet->rtp.packetType;
|
||||
audio_fec_packet->fecHeader.ssrc = audio_packet->rtp.ssrc;
|
||||
|
||||
while(auto packet = packets->pop()) {
|
||||
if(shutdown_event->peek()) {
|
||||
break;
|
||||
@ -1020,16 +1012,19 @@ void audioBroadcastThread(udp::socket &sock) {
|
||||
session->audio.sequenceNumber++;
|
||||
session->audio.timestamp += session->config.audio.packetDuration;
|
||||
|
||||
auto &shards_p = session->audio.shards_p;
|
||||
|
||||
std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]);
|
||||
sock.send_to(asio::buffer((char *)audio_packet.get(), sizeof(audio_packet_raw_t) + bytes), session->audio.peer);
|
||||
|
||||
|
||||
BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv;
|
||||
|
||||
auto &fec_packet = session->audio.fec_packet;
|
||||
// initialize the FEC header at the beginning of the FEC block
|
||||
if(sequenceNumber % RTPA_DATA_SHARDS == 0) {
|
||||
audio_fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
|
||||
audio_fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp);
|
||||
fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
|
||||
fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp);
|
||||
}
|
||||
|
||||
// generate parity shards at the end of the FEC block
|
||||
@ -1037,10 +1032,10 @@ void audioBroadcastThread(udp::socket &sock) {
|
||||
reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes);
|
||||
|
||||
for(auto x = 0; x < RTPA_FEC_SHARDS; ++x) {
|
||||
audio_fec_packet->rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
|
||||
audio_fec_packet->fecHeader.fecShardIndex = x;
|
||||
memcpy(audio_fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes);
|
||||
sock.send_to(asio::buffer((char *)audio_fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer);
|
||||
fec_packet->rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
|
||||
fec_packet->fecHeader.fecShardIndex = x;
|
||||
memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes);
|
||||
sock.send_to(asio::buffer((char *)fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer);
|
||||
BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv;
|
||||
}
|
||||
}
|
||||
@ -1134,76 +1129,105 @@ void end_broadcast(broadcast_ctx_t &ctx) {
|
||||
broadcast_shutdown_event->reset();
|
||||
}
|
||||
|
||||
int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) {
|
||||
int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, udp::endpoint &peer, std::chrono::milliseconds timeout) {
|
||||
auto constexpr ping = "PING"sv;
|
||||
|
||||
auto messages = std::make_shared<message_queue_t::element_type>(30);
|
||||
ref->message_queue_queue->raise(type, addr, messages);
|
||||
ref->message_queue_queue->raise(type, peer.address(), messages);
|
||||
|
||||
auto fg = util::fail_guard([&]() {
|
||||
messages->stop();
|
||||
|
||||
// remove message queue from session
|
||||
ref->message_queue_queue->raise(type, addr, nullptr);
|
||||
ref->message_queue_queue->raise(type, peer.address(), nullptr);
|
||||
});
|
||||
|
||||
auto msg_opt = messages->pop(config::stream.ping_timeout);
|
||||
messages->stop();
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
auto current_time = start_time;
|
||||
|
||||
if(!msg_opt) {
|
||||
BOOST_LOG(error) << "Initial Ping Timeout"sv;
|
||||
while(current_time - start_time < config::stream.ping_timeout) {
|
||||
auto delta_time = current_time - start_time;
|
||||
|
||||
return -1;
|
||||
auto msg_opt = messages->pop(config::stream.ping_timeout - delta_time);
|
||||
if(!msg_opt) {
|
||||
break;
|
||||
}
|
||||
|
||||
TUPLE_2D_REF(port, msg, *msg_opt);
|
||||
if(msg == ping) {
|
||||
BOOST_LOG(debug) << "Received ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']';
|
||||
|
||||
// Update connection details.
|
||||
{
|
||||
auto addr_str = peer.address().to_string();
|
||||
|
||||
auto &connections = ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
std::remove_reference_t<decltype(*connections)>::iterator pos = std::end(*connections);
|
||||
|
||||
for(auto it = std::begin(*connections); it != std::end(*connections); ++it) {
|
||||
TUPLE_2D_REF(addr, port_ref, *it);
|
||||
|
||||
if(!port_ref && addr_str == addr) {
|
||||
pos = it;
|
||||
}
|
||||
else if(port_ref == port) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(pos == std::end(*connections)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pos->second = port;
|
||||
peer.port(port);
|
||||
}
|
||||
|
||||
return port;
|
||||
}
|
||||
|
||||
BOOST_LOG(debug) << "Received non-ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']';
|
||||
|
||||
current_time = std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
TUPLE_2D_REF(port, msg, *msg_opt);
|
||||
if(msg != ping) {
|
||||
BOOST_LOG(error) << "First message is not a PING";
|
||||
BOOST_LOG(debug) << "Received from "sv << addr << ':' << port << " ["sv << util::hex_vec(msg) << ']';
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return port;
|
||||
BOOST_LOG(error) << "Initial Ping Timeout"sv;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void videoThread(session_t *session, std::string addr_str) {
|
||||
void videoThread(session_t *session) {
|
||||
auto fg = util::fail_guard([&]() {
|
||||
session::stop(*session);
|
||||
});
|
||||
|
||||
while_starting_do_nothing(session->state);
|
||||
|
||||
auto addr = asio::ip::make_address(addr_str);
|
||||
auto ref = broadcast.ref();
|
||||
auto port = recv_ping(ref, socket_e::video, addr, config::stream.ping_timeout);
|
||||
auto port = recv_ping(ref, socket_e::video, session->video.peer, config::stream.ping_timeout);
|
||||
if(port < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
session->video.peer.address(addr);
|
||||
session->video.peer.port(port);
|
||||
|
||||
BOOST_LOG(debug) << "Start capturing Video"sv;
|
||||
video::capture(session->mail, session->config.monitor, session);
|
||||
}
|
||||
|
||||
void audioThread(session_t *session, std::string addr_str) {
|
||||
void audioThread(session_t *session) {
|
||||
auto fg = util::fail_guard([&]() {
|
||||
session::stop(*session);
|
||||
});
|
||||
|
||||
while_starting_do_nothing(session->state);
|
||||
|
||||
auto addr = asio::ip::make_address(addr_str);
|
||||
|
||||
auto ref = broadcast.ref();
|
||||
auto port = recv_ping(ref, socket_e::audio, addr, config::stream.ping_timeout);
|
||||
auto port = recv_ping(ref, socket_e::audio, session->audio.peer, config::stream.ping_timeout);
|
||||
if(port < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
session->audio.peer.address(addr);
|
||||
session->audio.peer.port(port);
|
||||
|
||||
BOOST_LOG(debug) << "Start capturing Audio"sv;
|
||||
audio::capture(session->mail, session->config.audio, session);
|
||||
}
|
||||
@ -1234,6 +1258,42 @@ void join(session_t &session) {
|
||||
//Reset input on session stop to avoid stuck repeated keys
|
||||
BOOST_LOG(debug) << "Resetting Input..."sv;
|
||||
input::reset(session.input);
|
||||
|
||||
BOOST_LOG(debug) << "Removing references to any connections..."sv;
|
||||
{
|
||||
auto video_addr = session.video.peer.address().to_string();
|
||||
auto audio_addr = session.audio.peer.address().to_string();
|
||||
|
||||
auto video_port = session.video.peer.port();
|
||||
auto audio_port = session.audio.peer.port();
|
||||
|
||||
auto &connections = session.broadcast_ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
auto validate_size = connections->size();
|
||||
for(auto it = std::begin(*connections); it != std::end(*connections);) {
|
||||
TUPLE_2D_REF(addr, port, *it);
|
||||
|
||||
if((video_port == port && video_addr == addr) ||
|
||||
(audio_port == port && audio_addr == addr)) {
|
||||
it = connections->erase(it);
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
auto new_size = connections->size();
|
||||
if(validate_size != new_size + 2) {
|
||||
BOOST_LOG(warning) << "Couldn't remove reference to session connections: ending all broadcasts"sv;
|
||||
|
||||
// A reference to the event object is still stored somewhere else. So no need to keep
|
||||
// a reference to it.
|
||||
mail::man->event<bool>(mail::broadcast_shutdown)->raise(true);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_LOG(debug) << "Session ended"sv;
|
||||
}
|
||||
|
||||
@ -1247,10 +1307,28 @@ int start(session_t &session, const std::string &addr_string) {
|
||||
|
||||
session.broadcast_ref->control_server.emplace_addr_to_session(addr_string, session);
|
||||
|
||||
auto addr = boost::asio::ip::make_address(addr_string);
|
||||
session.video.peer.address(addr);
|
||||
session.video.peer.port(0);
|
||||
|
||||
session.audio.peer.address(addr);
|
||||
session.audio.peer.port(0);
|
||||
|
||||
{
|
||||
auto &connections = session.broadcast_ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
// allocate a location for connections
|
||||
connections->emplace_back(addr_string, 0);
|
||||
connections->emplace_back(addr_string, 0);
|
||||
}
|
||||
|
||||
|
||||
session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
|
||||
|
||||
session.audioThread = std::thread { audioThread, &session, addr_string };
|
||||
session.videoThread = std::thread { videoThread, &session, addr_string };
|
||||
session.audioThread = std::thread { audioThread, &session };
|
||||
session.videoThread = std::thread { videoThread, &session };
|
||||
|
||||
session.state.store(state_e::RUNNING, std::memory_order_relaxed);
|
||||
|
||||
@ -1274,6 +1352,30 @@ std::shared_ptr<session_t> alloc(config_t &config, crypto::aes_t &gcm_key, crypt
|
||||
session->video.idr_events = mail->event<bool>(mail::idr);
|
||||
session->video.lowseq = 0;
|
||||
|
||||
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
|
||||
|
||||
util::buffer_t<char> shards { RTPA_TOTAL_SHARDS * max_block_size };
|
||||
util::buffer_t<uint8_t *> shards_p { RTPA_TOTAL_SHARDS };
|
||||
|
||||
for(auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) {
|
||||
shards_p[x] = (uint8_t *)&shards[x * max_block_size];
|
||||
}
|
||||
|
||||
// Audio FEC spans multiple audio packets,
|
||||
// therefore its session specific
|
||||
session->audio.shards = std::move(shards);
|
||||
session->audio.shards_p = std::move(shards_p);
|
||||
|
||||
session->audio.fec_packet.reset((audio_fec_packet_raw_t *)malloc(sizeof(audio_fec_packet_raw_t) + max_block_size));
|
||||
|
||||
session->audio.fec_packet->rtp.header = 0x80;
|
||||
session->audio.fec_packet->rtp.packetType = 127;
|
||||
session->audio.fec_packet->rtp.timestamp = 0;
|
||||
session->audio.fec_packet->rtp.ssrc = 0;
|
||||
|
||||
session->audio.fec_packet->fecHeader.payloadType = 97;
|
||||
session->audio.fec_packet->fecHeader.ssrc = 0;
|
||||
|
||||
session->audio.cipher = crypto::cipher::cbc_t {
|
||||
gcm_key, true
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user