mirror of
https://github.com/LizardByte/Sunshine.git
synced 2025-01-30 03:32:43 +00:00
Implement new protocol extension to match client connections together
Fixes #1804 Fixes #1862 Fixes #1852
This commit is contained in:
parent
11d472362c
commit
a9f2679a49
@ -290,6 +290,12 @@ namespace nvhttp {
|
||||
launch_session.gcmap = util::from_view(get_arg(args, "gcmap", "0"));
|
||||
launch_session.enable_hdr = util::from_view(get_arg(args, "hdrMode", "0"));
|
||||
|
||||
// Generate the unique identifiers for this connection that we will send later during RTSP handshake
|
||||
unsigned char raw_payload[8];
|
||||
RAND_bytes(raw_payload, sizeof(raw_payload));
|
||||
launch_session.av_ping_payload = util::hex_vec(raw_payload);
|
||||
RAND_bytes((unsigned char *) &launch_session.control_connect_data, sizeof(launch_session.control_connect_data));
|
||||
|
||||
uint32_t prepend_iv = util::endian::big<uint32_t>(util::from_view(get_arg(args, "rikeyid")));
|
||||
auto prepend_iv_p = (uint8_t *) &prepend_iv;
|
||||
|
||||
|
30
src/rtsp.cpp
30
src/rtsp.cpp
@ -563,17 +563,26 @@ namespace rtsp_stream {
|
||||
|
||||
void
|
||||
cmd_setup(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
|
||||
OPTION_ITEM options[3] {};
|
||||
OPTION_ITEM options[4] {};
|
||||
|
||||
auto &seqn = options[0];
|
||||
auto &session_option = options[1];
|
||||
auto &port_option = options[2];
|
||||
auto &payload_option = options[3];
|
||||
|
||||
seqn.option = const_cast<char *>("CSeq");
|
||||
|
||||
auto seqn_str = std::to_string(req->sequenceNumber);
|
||||
seqn.content = const_cast<char *>(seqn_str.c_str());
|
||||
|
||||
if (!server->launch_event.peek()) {
|
||||
// /launch has not been used
|
||||
|
||||
respond(sock, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
|
||||
return;
|
||||
}
|
||||
auto launch_session { server->launch_event.view() };
|
||||
|
||||
std::string_view target { req->message.request.target };
|
||||
auto begin = std::find(std::begin(target), std::end(target), '=') + 1;
|
||||
auto end = std::find(begin, std::end(target), '/');
|
||||
@ -608,6 +617,19 @@ namespace rtsp_stream {
|
||||
port_option.option = const_cast<char *>("Transport");
|
||||
port_option.content = port_value.data();
|
||||
|
||||
// Send identifiers that will be echoed in the other connections
|
||||
auto connect_data = std::to_string(launch_session->control_connect_data);
|
||||
if (type == "control"sv) {
|
||||
payload_option.option = const_cast<char *>("X-SS-Connect-Data");
|
||||
payload_option.content = connect_data.data();
|
||||
}
|
||||
else {
|
||||
payload_option.option = const_cast<char *>("X-SS-Ping-Payload");
|
||||
payload_option.content = launch_session->av_ping_payload.data();
|
||||
}
|
||||
|
||||
port_option.next = &payload_option;
|
||||
|
||||
respond(sock, &seqn, 200, "OK", req->sequenceNumber, {});
|
||||
}
|
||||
|
||||
@ -679,6 +701,7 @@ namespace rtsp_stream {
|
||||
args.try_emplace("x-nv-general.useReliableUdp"sv, "1"sv);
|
||||
args.try_emplace("x-nv-vqos[0].fec.minRequiredFecPackets"sv, "0"sv);
|
||||
args.try_emplace("x-nv-general.featureFlags"sv, "135"sv);
|
||||
args.try_emplace("x-ml-general.featureFlags"sv, "0"sv);
|
||||
args.try_emplace("x-nv-vqos[0].qosTrafficType"sv, "5"sv);
|
||||
args.try_emplace("x-nv-aqos.qosTrafficType"sv, "4"sv);
|
||||
|
||||
@ -696,7 +719,8 @@ namespace rtsp_stream {
|
||||
config.controlProtocolType = util::from_view(args.at("x-nv-general.useReliableUdp"sv));
|
||||
config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv));
|
||||
config.minRequiredFecPackets = util::from_view(args.at("x-nv-vqos[0].fec.minRequiredFecPackets"sv));
|
||||
config.featureFlags = util::from_view(args.at("x-nv-general.featureFlags"sv));
|
||||
config.nvFeatureFlags = util::from_view(args.at("x-nv-general.featureFlags"sv));
|
||||
config.mlFeatureFlags = util::from_view(args.at("x-ml-general.featureFlags"sv));
|
||||
config.audioQosType = util::from_view(args.at("x-nv-aqos.qosTrafficType"sv));
|
||||
config.videoQosType = util::from_view(args.at("x-nv-vqos[0].qosTrafficType"sv));
|
||||
|
||||
@ -742,7 +766,7 @@ namespace rtsp_stream {
|
||||
return;
|
||||
}
|
||||
|
||||
auto session = stream::session::alloc(config, launch_session->gcm_key, launch_session->iv);
|
||||
auto session = stream::session::alloc(config, launch_session->gcm_key, launch_session->iv, launch_session->av_ping_payload, launch_session->control_connect_data);
|
||||
|
||||
auto slot = server->accept(session);
|
||||
if (!slot) {
|
||||
|
@ -16,6 +16,9 @@ namespace rtsp_stream {
|
||||
crypto::aes_t gcm_key;
|
||||
crypto::aes_t iv;
|
||||
|
||||
std::string av_ping_payload;
|
||||
uint32_t control_connect_data;
|
||||
|
||||
bool host_audio;
|
||||
std::string unique_id;
|
||||
int width;
|
||||
|
299
src/stream.cpp
299
src/stream.cpp
@ -13,8 +13,7 @@
|
||||
#include <boost/endian/arithmetic.hpp>
|
||||
|
||||
extern "C" {
|
||||
#include <moonlight-common-c/src/RtpAudioQueue.h>
|
||||
#include <moonlight-common-c/src/Video.h>
|
||||
#include <moonlight-common-c/src/Limelight-internal.h>
|
||||
#include <rs.h>
|
||||
}
|
||||
|
||||
@ -229,8 +228,9 @@ namespace stream {
|
||||
using audio_fec_packet_t = util::c_ptr<audio_fec_packet_raw_t>;
|
||||
using audio_aes_t = std::array<char, round_to_pkcs7_padded(MAX_AUDIO_PACKET_SIZE)>;
|
||||
|
||||
using message_queue_t = std::shared_ptr<safe::queue_t<std::pair<std::uint16_t, std::string>>>;
|
||||
using message_queue_queue_t = std::shared_ptr<safe::queue_t<std::tuple<socket_e, asio::ip::address, message_queue_t>>>;
|
||||
using av_session_id_t = std::variant<asio::ip::address, std::string>; // IP address or SS-Ping-Payload from RTSP handshake
|
||||
using message_queue_t = std::shared_ptr<safe::queue_t<std::pair<udp::endpoint, std::string>>>;
|
||||
using message_queue_queue_t = std::shared_ptr<safe::queue_t<std::tuple<socket_e, av_session_id_t, message_queue_t>>>;
|
||||
|
||||
// return bytes written on success
|
||||
// return -1 on error
|
||||
@ -264,18 +264,11 @@ namespace stream {
|
||||
return !(bool) _host;
|
||||
}
|
||||
|
||||
void
|
||||
emplace_addr_to_session(const std::string &addr, session_t &session) {
|
||||
auto lg = _map_addr_session.lock();
|
||||
|
||||
_map_addr_session->emplace(addr, std::make_pair(0u, &session));
|
||||
}
|
||||
|
||||
// Get session associated with address.
|
||||
// If none are found, try to find a session not yet claimed. (It will be marked by a port of value 0
|
||||
// If none of those are found, return nullptr
|
||||
session_t *
|
||||
get_session(const net::peer_t peer);
|
||||
get_session(const net::peer_t peer, uint32_t connect_data);
|
||||
|
||||
// Circular dependency:
|
||||
// iterate refers to session
|
||||
@ -313,8 +306,11 @@ namespace stream {
|
||||
// Callbacks
|
||||
std::unordered_map<std::uint16_t, std::function<void(session_t *, const std::string_view &)>> _map_type_cb;
|
||||
|
||||
// Mapping ip:port to session
|
||||
sync_util::sync_t<std::unordered_multimap<std::string, std::pair<std::uint16_t, session_t *>>> _map_addr_session;
|
||||
// All active sessions (including those still waiting for a peer to connect)
|
||||
sync_util::sync_t<std::vector<session_t *>> _sessions;
|
||||
|
||||
// ENet peer to session mapping for sessions with a peer connected
|
||||
sync_util::sync_t<std::map<net::peer_t, session_t *>> _peer_to_session;
|
||||
|
||||
ENetAddress _addr;
|
||||
net::host_t _host;
|
||||
@ -333,12 +329,6 @@ namespace stream {
|
||||
udp::socket video_sock { io };
|
||||
udp::socket audio_sock { io };
|
||||
|
||||
// This is purely for administrative purposes.
|
||||
// It's possible two instances of Moonlight are behind a NAT.
|
||||
// From Sunshine's point of view, the ip addresses are identical
|
||||
// We need some way to know what ports are already used for different streams
|
||||
sync_util::sync_t<std::vector<std::pair<std::string, std::uint16_t>>> audio_video_connections;
|
||||
|
||||
control_server_t control_server;
|
||||
};
|
||||
|
||||
@ -359,15 +349,20 @@ namespace stream {
|
||||
boost::asio::ip::address localAddress;
|
||||
|
||||
struct {
|
||||
std::string ping_payload;
|
||||
|
||||
int lowseq;
|
||||
udp::endpoint peer;
|
||||
|
||||
safe::mail_raw_t::event_t<bool> idr_events;
|
||||
safe::mail_raw_t::event_t<std::pair<int64_t, int64_t>> invalidate_ref_frames_events;
|
||||
|
||||
std::unique_ptr<platf::deinit_t> qos;
|
||||
} video;
|
||||
|
||||
struct {
|
||||
crypto::cipher::cbc_t cipher;
|
||||
std::string ping_payload;
|
||||
|
||||
std::uint16_t sequenceNumber;
|
||||
// avRiKeyId == util::endian::big(First (sizeof(avRiKeyId)) bytes of launch_session->iv)
|
||||
@ -386,6 +381,9 @@ namespace stream {
|
||||
crypto::cipher::gcm_t cipher;
|
||||
crypto::aes_t iv;
|
||||
|
||||
uint32_t connect_data; // Used for new clients with ML_FF_SESSION_ID_V1
|
||||
std::string expected_peer_address; // Only used for legacy clients without ML_FF_SESSION_ID_V1
|
||||
|
||||
net::peer_t peer;
|
||||
std::uint8_t seq;
|
||||
|
||||
@ -445,29 +443,47 @@ namespace stream {
|
||||
static auto broadcast = safe::make_shared<broadcast_ctx_t>(start_broadcast, end_broadcast);
|
||||
|
||||
session_t *
|
||||
control_server_t::get_session(const net::peer_t peer) {
|
||||
TUPLE_2D(port, addr_string, platf::from_sockaddr_ex((sockaddr *) &peer->address.address));
|
||||
|
||||
auto lg = _map_addr_session.lock();
|
||||
TUPLE_2D(begin, end, _map_addr_session->equal_range(addr_string));
|
||||
|
||||
auto it = std::end(_map_addr_session.raw);
|
||||
for (auto pos = begin; pos != end; ++pos) {
|
||||
TUPLE_2D_REF(session_port, session_p, pos->second);
|
||||
|
||||
if (port == session_port) {
|
||||
return session_p;
|
||||
}
|
||||
else if (session_port == 0) {
|
||||
it = pos;
|
||||
control_server_t::get_session(const net::peer_t peer, uint32_t connect_data) {
|
||||
{
|
||||
// Fast path - look up existing session by peer
|
||||
auto lg = _peer_to_session.lock();
|
||||
auto it = _peer_to_session->find(peer);
|
||||
if (it != _peer_to_session->end()) {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
if (it != std::end(_map_addr_session.raw)) {
|
||||
TUPLE_2D_REF(session_port, session_p, it->second);
|
||||
// Slow path - process new session
|
||||
TUPLE_2D(peer_port, peer_addr, platf::from_sockaddr_ex((sockaddr *) &peer->address.address));
|
||||
auto lg = _sessions.lock();
|
||||
for (auto pos = std::begin(*_sessions); pos != std::end(*_sessions); ++pos) {
|
||||
auto session_p = *pos;
|
||||
|
||||
// Skip sessions that are already established
|
||||
if (session_p->control.peer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Identify the connection by the unique connect data if the client supports it.
|
||||
// Only fall back to IP address matching for clients without session ID support.
|
||||
if (session_p->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) {
|
||||
if (session_p->control.connect_data != connect_data) {
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
BOOST_LOG(debug) << "Initialized new control stream session by connect data match [v2]"sv;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (session_p->control.expected_peer_address != peer_addr) {
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
BOOST_LOG(debug) << "Initialized new control stream session by IP address match [v1]"sv;
|
||||
}
|
||||
}
|
||||
|
||||
session_p->control.peer = peer;
|
||||
session_port = port;
|
||||
|
||||
// Use the local address from the control connection as the source address
|
||||
// for other communications to the client. This is necessary to ensure
|
||||
@ -475,6 +491,12 @@ namespace stream {
|
||||
auto local_address = platf::from_sockaddr((sockaddr *) &peer->localAddress.address);
|
||||
session_p->localAddress = boost::asio::ip::make_address(local_address);
|
||||
|
||||
BOOST_LOG(debug) << "Control local address ["sv << local_address << ']';
|
||||
BOOST_LOG(debug) << "Control peer address ["sv << peer_addr << ':' << peer_port << ']';
|
||||
|
||||
// Insert this into the map for O(1) lookups in the future
|
||||
auto ptslg = _peer_to_session.lock();
|
||||
_peer_to_session->emplace(peer, session_p);
|
||||
return session_p;
|
||||
}
|
||||
|
||||
@ -502,7 +524,7 @@ namespace stream {
|
||||
auto res = enet_host_service(_host.get(), &event, timeout.count());
|
||||
|
||||
if (res > 0) {
|
||||
auto session = get_session(event.peer);
|
||||
auto session = get_session(event.peer, event.data);
|
||||
if (!session) {
|
||||
BOOST_LOG(warning) << "Rejected connection from ["sv << platf::from_sockaddr((sockaddr *) &event.peer->address.address) << "]: it's not properly set up"sv;
|
||||
enet_peer_disconnect_now(event.peer, 0);
|
||||
@ -936,23 +958,28 @@ namespace stream {
|
||||
bool has_session_awaiting_peer = false;
|
||||
|
||||
{
|
||||
auto lg = server->_map_addr_session.lock();
|
||||
auto lg = server->_sessions.lock();
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
KITTY_WHILE_LOOP(auto pos = std::begin(*server->_map_addr_session), pos != std::end(*server->_map_addr_session), {
|
||||
TUPLE_2D_REF(addr, port_session, *pos);
|
||||
auto session = port_session.second;
|
||||
KITTY_WHILE_LOOP(auto pos = std::begin(*server->_sessions), pos != std::end(*server->_sessions), {
|
||||
auto session = *pos;
|
||||
|
||||
if (now > session->pingTimeout) {
|
||||
BOOST_LOG(info) << addr << ": Ping Timeout"sv;
|
||||
auto address = session->control.peer ? platf::from_sockaddr((sockaddr *) &session->control.peer->address.address) : session->control.expected_peer_address;
|
||||
BOOST_LOG(info) << address << ": Ping Timeout"sv;
|
||||
session::stop(*session);
|
||||
}
|
||||
|
||||
if (session->state.load(std::memory_order_acquire) == session::state_e::STOPPING) {
|
||||
pos = server->_map_addr_session->erase(pos);
|
||||
pos = server->_sessions->erase(pos);
|
||||
|
||||
if (session->control.peer) {
|
||||
{
|
||||
auto ptslg = server->_peer_to_session.lock();
|
||||
server->_peer_to_session->erase(session->control.peer);
|
||||
}
|
||||
|
||||
enet_peer_disconnect_now(session->control.peer, 0);
|
||||
}
|
||||
|
||||
@ -1008,9 +1035,9 @@ namespace stream {
|
||||
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
||||
encrypted_payload;
|
||||
|
||||
auto lg = server->_map_addr_session.lock();
|
||||
for (auto pos = std::begin(*server->_map_addr_session); pos != std::end(*server->_map_addr_session); ++pos) {
|
||||
auto session = pos->second.second;
|
||||
auto lg = server->_sessions.lock();
|
||||
for (auto pos = std::begin(*server->_sessions); pos != std::end(*server->_sessions); ++pos) {
|
||||
auto session = *pos;
|
||||
|
||||
// We may not have gotten far enough to have an ENet connection yet
|
||||
if (session->control.peer) {
|
||||
@ -1031,8 +1058,8 @@ namespace stream {
|
||||
|
||||
void
|
||||
recvThread(broadcast_ctx_t &ctx) {
|
||||
std::map<asio::ip::address, message_queue_t> peer_to_video_session;
|
||||
std::map<asio::ip::address, message_queue_t> peer_to_audio_session;
|
||||
std::map<av_session_id_t, message_queue_t> peer_to_video_session;
|
||||
std::map<av_session_id_t, message_queue_t> peer_to_audio_session;
|
||||
|
||||
auto &video_sock = ctx.video_sock;
|
||||
auto &audio_sock = ctx.audio_sock;
|
||||
@ -1050,30 +1077,30 @@ namespace stream {
|
||||
auto populate_peer_to_session = [&]() {
|
||||
while (message_queue_queue->peek()) {
|
||||
auto message_queue_opt = message_queue_queue->pop();
|
||||
TUPLE_3D_REF(socket_type, addr, message_queue, *message_queue_opt);
|
||||
TUPLE_3D_REF(socket_type, session_id, message_queue, *message_queue_opt);
|
||||
|
||||
switch (socket_type) {
|
||||
case socket_e::video:
|
||||
if (message_queue) {
|
||||
peer_to_video_session.emplace(addr, message_queue);
|
||||
peer_to_video_session.emplace(session_id, message_queue);
|
||||
}
|
||||
else {
|
||||
peer_to_video_session.erase(addr);
|
||||
peer_to_video_session.erase(session_id);
|
||||
}
|
||||
break;
|
||||
case socket_e::audio:
|
||||
if (message_queue) {
|
||||
peer_to_audio_session.emplace(addr, message_queue);
|
||||
peer_to_audio_session.emplace(session_id, message_queue);
|
||||
}
|
||||
else {
|
||||
peer_to_audio_session.erase(addr);
|
||||
peer_to_audio_session.erase(session_id);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map<asio::ip::address, message_queue_t> &peer_to_session) {
|
||||
auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map<av_session_id_t, message_queue_t> &peer_to_session) {
|
||||
recv_func[buf_elem] = [&, buf_elem](const boost::system::error_code &ec, size_t bytes) {
|
||||
auto fg = util::fail_guard([&]() {
|
||||
sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func[buf_elem]);
|
||||
@ -1094,10 +1121,23 @@ namespace stream {
|
||||
return;
|
||||
}
|
||||
|
||||
auto it = peer_to_session.find(peer.address());
|
||||
if (it != std::end(peer_to_session)) {
|
||||
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
||||
it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes });
|
||||
if (bytes == 4) {
|
||||
// For legacy PING packets, find the matching session by address.
|
||||
auto it = peer_to_session.find(peer.address());
|
||||
if (it != std::end(peer_to_session)) {
|
||||
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
||||
it->second->raise(peer, std::string { buf[buf_elem].data(), bytes });
|
||||
}
|
||||
}
|
||||
else if (bytes >= sizeof(SS_PING)) {
|
||||
auto ping = (PSS_PING) buf[buf_elem].data();
|
||||
|
||||
// For new PING packets that include a client identifier, search by payload.
|
||||
auto it = peer_to_session.find(std::string { ping->payload, sizeof(ping->payload) });
|
||||
if (it != std::end(peer_to_session)) {
|
||||
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
||||
it->second->raise(peer, std::string { buf[buf_elem].data(), bytes });
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
@ -1375,7 +1415,7 @@ namespace stream {
|
||||
// For now, encode_audio needs it to be the proper sequenceNumber
|
||||
audio_packet->rtp.sequenceNumber = sequenceNumber;
|
||||
|
||||
auto bytes = encode_audio(session->config.featureFlags, packet_data, audio_packet, session->audio.avRiKeyId, session->audio.cipher);
|
||||
auto bytes = encode_audio(session->config.nvFeatureFlags, packet_data, audio_packet, session->audio.avRiKeyId, session->audio.cipher);
|
||||
if (bytes < 0) {
|
||||
BOOST_LOG(error) << "Couldn't encode audio packet"sv;
|
||||
break;
|
||||
@ -1531,17 +1571,24 @@ namespace stream {
|
||||
}
|
||||
|
||||
int
|
||||
recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, udp::endpoint &peer, std::chrono::milliseconds timeout) {
|
||||
auto constexpr ping = "PING"sv;
|
||||
|
||||
recv_ping(session_t *session, decltype(broadcast)::ptr_t ref, socket_e type, std::string_view expected_payload, udp::endpoint &peer, std::chrono::milliseconds timeout) {
|
||||
auto messages = std::make_shared<message_queue_t::element_type>(30);
|
||||
ref->message_queue_queue->raise(type, peer.address(), messages);
|
||||
av_session_id_t session_id = std::string { expected_payload };
|
||||
|
||||
// Only allow matches on the peer address for legacy clients
|
||||
if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) {
|
||||
ref->message_queue_queue->raise(type, peer.address(), messages);
|
||||
}
|
||||
ref->message_queue_queue->raise(type, session_id, messages);
|
||||
|
||||
auto fg = util::fail_guard([&]() {
|
||||
messages->stop();
|
||||
|
||||
// remove message queue from session
|
||||
ref->message_queue_queue->raise(type, peer.address(), nullptr);
|
||||
if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) {
|
||||
ref->message_queue_queue->raise(type, peer.address(), nullptr);
|
||||
}
|
||||
ref->message_queue_queue->raise(type, session_id, nullptr);
|
||||
});
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
@ -1555,45 +1602,24 @@ namespace stream {
|
||||
break;
|
||||
}
|
||||
|
||||
TUPLE_2D_REF(port, msg, *msg_opt);
|
||||
if (msg == ping) {
|
||||
BOOST_LOG(debug) << "Received ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']';
|
||||
|
||||
// Update connection details.
|
||||
{
|
||||
auto addr_str = peer.address().to_string();
|
||||
|
||||
auto &connections = ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
std::remove_reference_t<decltype(*connections)>::iterator pos = std::end(*connections);
|
||||
|
||||
for (auto it = std::begin(*connections); it != std::end(*connections); ++it) {
|
||||
TUPLE_2D_REF(addr, port_ref, *it);
|
||||
|
||||
if (!port_ref && addr_str == addr) {
|
||||
pos = it;
|
||||
}
|
||||
else if (port_ref == port) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pos == std::end(*connections)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pos->second = port;
|
||||
peer.port(port);
|
||||
}
|
||||
|
||||
return port;
|
||||
TUPLE_2D_REF(recv_peer, msg, *msg_opt);
|
||||
if (msg.find(expected_payload) != std::string::npos) {
|
||||
// Match the new PING payload format
|
||||
BOOST_LOG(debug) << "Received ping [v2] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
||||
}
|
||||
else if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) && msg == "PING"sv) {
|
||||
// Match the legacy fixed PING payload only if the new type is not supported
|
||||
BOOST_LOG(debug) << "Received ping [v1] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
||||
}
|
||||
else {
|
||||
BOOST_LOG(debug) << "Received non-ping from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
||||
current_time = std::chrono::steady_clock::now();
|
||||
continue;
|
||||
}
|
||||
|
||||
BOOST_LOG(debug) << "Received non-ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']';
|
||||
|
||||
current_time = std::chrono::steady_clock::now();
|
||||
// Update connection details.
|
||||
peer = recv_peer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
BOOST_LOG(error) << "Initial Ping Timeout"sv;
|
||||
@ -1609,8 +1635,8 @@ namespace stream {
|
||||
while_starting_do_nothing(session->state);
|
||||
|
||||
auto ref = broadcast.ref();
|
||||
auto port = recv_ping(ref, socket_e::video, session->video.peer, config::stream.ping_timeout);
|
||||
if (port < 0) {
|
||||
auto error = recv_ping(session, ref, socket_e::video, session->video.ping_payload, session->video.peer, config::stream.ping_timeout);
|
||||
if (error < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1634,8 +1660,8 @@ namespace stream {
|
||||
while_starting_do_nothing(session->state);
|
||||
|
||||
auto ref = broadcast.ref();
|
||||
auto port = recv_ping(ref, socket_e::audio, session->audio.peer, config::stream.ping_timeout);
|
||||
if (port < 0) {
|
||||
auto error = recv_ping(session, ref, socket_e::audio, session->audio.ping_payload, session->audio.peer, config::stream.ping_timeout);
|
||||
if (error < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1696,41 +1722,6 @@ namespace stream {
|
||||
BOOST_LOG(debug) << "Resetting Input..."sv;
|
||||
input::reset(session.input);
|
||||
|
||||
BOOST_LOG(debug) << "Removing references to any connections..."sv;
|
||||
{
|
||||
auto video_addr = session.video.peer.address().to_string();
|
||||
auto audio_addr = session.audio.peer.address().to_string();
|
||||
|
||||
auto video_port = session.video.peer.port();
|
||||
auto audio_port = session.audio.peer.port();
|
||||
|
||||
auto &connections = session.broadcast_ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
auto validate_size = connections->size();
|
||||
for (auto it = std::begin(*connections); it != std::end(*connections);) {
|
||||
TUPLE_2D_REF(addr, port, *it);
|
||||
|
||||
if ((video_port == port && video_addr == addr) ||
|
||||
(audio_port == port && audio_addr == addr)) {
|
||||
it = connections->erase(it);
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
auto new_size = connections->size();
|
||||
if (validate_size != new_size + 2) {
|
||||
BOOST_LOG(warning) << "Couldn't remove reference to session connections: ending all broadcasts"sv;
|
||||
|
||||
// A reference to the event object is still stored somewhere else. So no need to keep
|
||||
// a reference to it.
|
||||
mail::man->event<bool>(mail::broadcast_shutdown)->raise(true);
|
||||
}
|
||||
}
|
||||
|
||||
// If this is the last session, invoke the platform callbacks
|
||||
if (--running_sessions == 0) {
|
||||
#if defined SUNSHINE_TRAY && SUNSHINE_TRAY >= 1
|
||||
@ -1753,7 +1744,14 @@ namespace stream {
|
||||
return -1;
|
||||
}
|
||||
|
||||
session.broadcast_ref->control_server.emplace_addr_to_session(addr_string, session);
|
||||
session.control.expected_peer_address = addr_string;
|
||||
BOOST_LOG(debug) << "Expecting incoming session connections from "sv << addr_string;
|
||||
|
||||
// Insert this session into the session list
|
||||
{
|
||||
auto lg = session.broadcast_ref->control_server._sessions.lock();
|
||||
session.broadcast_ref->control_server._sessions->push_back(&session);
|
||||
}
|
||||
|
||||
auto addr = boost::asio::ip::make_address(addr_string);
|
||||
session.video.peer.address(addr);
|
||||
@ -1762,16 +1760,6 @@ namespace stream {
|
||||
session.audio.peer.address(addr);
|
||||
session.audio.peer.port(0);
|
||||
|
||||
{
|
||||
auto &connections = session.broadcast_ref->audio_video_connections;
|
||||
|
||||
auto lg = connections.lock();
|
||||
|
||||
// allocate a location for connections
|
||||
connections->emplace_back(addr_string, 0);
|
||||
connections->emplace_back(addr_string, 0);
|
||||
}
|
||||
|
||||
session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
|
||||
|
||||
session.audioThread = std::thread { audioThread, &session };
|
||||
@ -1791,7 +1779,7 @@ namespace stream {
|
||||
}
|
||||
|
||||
std::shared_ptr<session_t>
|
||||
alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) {
|
||||
alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv, std::string_view av_ping_payload, uint32_t control_connect_data) {
|
||||
auto session = std::make_shared<session_t>();
|
||||
|
||||
auto mail = std::make_shared<safe::mail_raw_t>();
|
||||
@ -1800,6 +1788,7 @@ namespace stream {
|
||||
|
||||
session->config = config;
|
||||
|
||||
session->control.connect_data = control_connect_data;
|
||||
session->control.feedback_queue = mail->queue<platf::gamepad_feedback_msg_t>(mail::gamepad_feedback);
|
||||
session->control.hdr_queue = mail->event<video::hdr_info_t>(mail::hdr);
|
||||
session->control.iv = iv;
|
||||
@ -1810,6 +1799,7 @@ namespace stream {
|
||||
session->video.idr_events = mail->event<bool>(mail::idr);
|
||||
session->video.invalidate_ref_frames_events = mail->event<std::pair<int64_t, int64_t>>(mail::invalidate_ref_frames);
|
||||
session->video.lowseq = 0;
|
||||
session->video.ping_payload = av_ping_payload;
|
||||
|
||||
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
|
||||
|
||||
@ -1839,6 +1829,7 @@ namespace stream {
|
||||
gcm_key, true
|
||||
};
|
||||
|
||||
session->audio.ping_payload = av_ping_payload;
|
||||
session->audio.avRiKeyId = util::endian::big(*(std::uint32_t *) iv.data());
|
||||
session->audio.sequenceNumber = 0;
|
||||
session->audio.timestamp = 0;
|
||||
|
@ -22,7 +22,8 @@ namespace stream {
|
||||
|
||||
int packetsize;
|
||||
int minRequiredFecPackets;
|
||||
int featureFlags;
|
||||
int nvFeatureFlags;
|
||||
int mlFeatureFlags;
|
||||
int controlProtocolType;
|
||||
int audioQosType;
|
||||
int videoQosType;
|
||||
@ -39,7 +40,7 @@ namespace stream {
|
||||
};
|
||||
|
||||
std::shared_ptr<session_t>
|
||||
alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv);
|
||||
alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv, std::string_view av_ping_payload, uint32_t control_connect_data);
|
||||
int
|
||||
start(session_t &session, const std::string &addr_string);
|
||||
void
|
||||
|
Loading…
x
Reference in New Issue
Block a user