mirror of
https://github.com/LizardByte/Sunshine.git
synced 2025-01-30 12:32:43 +00:00
1953 lines
68 KiB
C++
1953 lines
68 KiB
C++
/**
|
|
* @file src/stream.cpp
|
|
* @brief todo
|
|
*/
|
|
#include "process.h"
|
|
|
|
#include <future>
|
|
#include <queue>
|
|
|
|
#include <fstream>
|
|
#include <openssl/err.h>
|
|
|
|
#include <boost/endian/arithmetic.hpp>
|
|
|
|
extern "C" {
|
|
#include <moonlight-common-c/src/Limelight-internal.h>
|
|
#include <rs.h>
|
|
}
|
|
|
|
#include "config.h"
|
|
#include "input.h"
|
|
#include "main.h"
|
|
#include "network.h"
|
|
#include "stat_trackers.h"
|
|
#include "stream.h"
|
|
#include "sync.h"
|
|
#include "system_tray.h"
|
|
#include "thread_safe.h"
|
|
#include "utility.h"
|
|
|
|
#define IDX_START_A 0
|
|
#define IDX_START_B 1
|
|
#define IDX_INVALIDATE_REF_FRAMES 2
|
|
#define IDX_LOSS_STATS 3
|
|
#define IDX_INPUT_DATA 5
|
|
#define IDX_RUMBLE_DATA 6
|
|
#define IDX_TERMINATION 7
|
|
#define IDX_PERIODIC_PING 8
|
|
#define IDX_REQUEST_IDR_FRAME 9
|
|
#define IDX_ENCRYPTED 10
|
|
#define IDX_HDR_MODE 11
|
|
#define IDX_RUMBLE_TRIGGER_DATA 12
|
|
#define IDX_SET_MOTION_EVENT 13
|
|
#define IDX_SET_RGB_LED 14
|
|
|
|
static const short packetTypes[] = {
|
|
0x0305, // Start A
|
|
0x0307, // Start B
|
|
0x0301, // Invalidate reference frames
|
|
0x0201, // Loss Stats
|
|
0x0204, // Frame Stats (unused)
|
|
0x0206, // Input data
|
|
0x010b, // Rumble data
|
|
0x0109, // Termination
|
|
0x0200, // Periodic Ping
|
|
0x0302, // IDR frame
|
|
0x0001, // fully encrypted
|
|
0x010e, // HDR mode
|
|
0x5500, // Rumble triggers (Sunshine protocol extension)
|
|
0x5501, // Set motion event (Sunshine protocol extension)
|
|
0x5502, // Set RGB LED (Sunshine protocol extension)
|
|
};
|
|
|
|
namespace asio = boost::asio;
|
|
namespace sys = boost::system;
|
|
|
|
using asio::ip::tcp;
|
|
using asio::ip::udp;
|
|
|
|
using namespace std::literals;
|
|
|
|
namespace stream {
|
|
|
|
enum class socket_e : int {
|
|
video,
|
|
audio
|
|
};
|
|
|
|
#pragma pack(push, 1)
|
|
|
|
struct video_short_frame_header_t {
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
|
|
std::uint8_t headerType; // Always 0x01 for short headers
|
|
|
|
// Sunshine extension
|
|
// Frame processing latency, in 1/10 ms units
|
|
// zero when the frame is repeated or there is no backend implementation
|
|
boost::endian::little_uint16_at frame_processing_latency;
|
|
|
|
// Currently known values:
|
|
// 1 = Normal P-frame
|
|
// 2 = IDR-frame
|
|
// 4 = P-frame with intra-refresh blocks
|
|
// 5 = P-frame after reference frame invalidation
|
|
std::uint8_t frameType;
|
|
|
|
// Length of the final packet payload for codecs that cannot handle
|
|
// zero padding, such as AV1 (Sunshine extension).
|
|
boost::endian::little_uint16_at lastPayloadLen;
|
|
|
|
std::uint8_t unknown[2];
|
|
};
|
|
|
|
static_assert(
|
|
sizeof(video_short_frame_header_t) == 8,
|
|
"Short frame header must be 8 bytes");
|
|
|
|
struct video_packet_raw_t {
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
|
|
RTP_PACKET rtp;
|
|
char reserved[4];
|
|
|
|
NV_VIDEO_PACKET packet;
|
|
};
|
|
|
|
struct video_packet_enc_prefix_t {
|
|
video_packet_raw_t *
|
|
payload() {
|
|
return (video_packet_raw_t *) (this + 1);
|
|
}
|
|
|
|
std::uint8_t iv[12]; // 12-byte IV is ideal for AES-GCM
|
|
std::uint32_t frameNumber;
|
|
std::uint8_t tag[16];
|
|
};
|
|
|
|
struct audio_packet_raw_t {
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
|
|
RTP_PACKET rtp;
|
|
};
|
|
|
|
struct control_header_v2 {
|
|
std::uint16_t type;
|
|
std::uint16_t payloadLength;
|
|
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
};
|
|
|
|
struct control_terminate_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint32_t ec;
|
|
};
|
|
|
|
struct control_rumble_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint32_t useless;
|
|
|
|
std::uint16_t id;
|
|
std::uint16_t lowfreq;
|
|
std::uint16_t highfreq;
|
|
};
|
|
|
|
struct control_rumble_triggers_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint16_t id;
|
|
std::uint16_t left;
|
|
std::uint16_t right;
|
|
};
|
|
|
|
struct control_set_motion_event_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint16_t id;
|
|
std::uint16_t reportrate;
|
|
std::uint8_t type;
|
|
};
|
|
|
|
struct control_set_rgb_led_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint16_t id;
|
|
std::uint8_t r;
|
|
std::uint8_t g;
|
|
std::uint8_t b;
|
|
};
|
|
|
|
struct control_hdr_mode_t {
|
|
control_header_v2 header;
|
|
|
|
std::uint8_t enabled;
|
|
|
|
// Sunshine protocol extension
|
|
SS_HDR_METADATA metadata;
|
|
};
|
|
|
|
typedef struct control_encrypted_t {
|
|
std::uint16_t encryptedHeaderType; // Always LE 0x0001
|
|
std::uint16_t length; // sizeof(seq) + 16 byte tag + secondary header and data
|
|
|
|
// seq is accepted as an arbitrary value in Moonlight
|
|
std::uint32_t seq; // Monotonically increasing sequence number (used as IV for AES-GCM)
|
|
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
// encrypted control_header_v2 and payload data follow
|
|
} *control_encrypted_p;
|
|
|
|
struct audio_fec_packet_raw_t {
|
|
uint8_t *
|
|
payload() {
|
|
return (uint8_t *) (this + 1);
|
|
}
|
|
|
|
RTP_PACKET rtp;
|
|
AUDIO_FEC_HEADER fecHeader;
|
|
};
|
|
|
|
#pragma pack(pop)
|
|
|
|
constexpr std::size_t
|
|
round_to_pkcs7_padded(std::size_t size) {
|
|
return ((size + 15) / 16) * 16;
|
|
}
|
|
constexpr std::size_t MAX_AUDIO_PACKET_SIZE = 1400;
|
|
|
|
using rh_t = util::safe_ptr<reed_solomon, reed_solomon_release>;
|
|
using video_packet_t = util::c_ptr<video_packet_raw_t>;
|
|
using audio_packet_t = util::c_ptr<audio_packet_raw_t>;
|
|
using audio_fec_packet_t = util::c_ptr<audio_fec_packet_raw_t>;
|
|
using audio_aes_t = std::array<char, round_to_pkcs7_padded(MAX_AUDIO_PACKET_SIZE)>;
|
|
|
|
using av_session_id_t = std::variant<asio::ip::address, std::string>; // IP address or SS-Ping-Payload from RTSP handshake
|
|
using message_queue_t = std::shared_ptr<safe::queue_t<std::pair<udp::endpoint, std::string>>>;
|
|
using message_queue_queue_t = std::shared_ptr<safe::queue_t<std::tuple<socket_e, av_session_id_t, message_queue_t>>>;
|
|
|
|
// return bytes written on success
|
|
// return -1 on error
|
|
static inline int
|
|
encode_audio(bool encrypted, const audio::buffer_t &plaintext, audio_packet_t &destination, crypto::aes_t &iv, crypto::cipher::cbc_t &cbc) {
|
|
// If encryption isn't enabled
|
|
if (!encrypted) {
|
|
std::copy(std::begin(plaintext), std::end(plaintext), destination->payload());
|
|
return plaintext.size();
|
|
}
|
|
|
|
return cbc.encrypt(std::string_view { (char *) std::begin(plaintext), plaintext.size() }, destination->payload(), &iv);
|
|
}
|
|
|
|
static inline void
|
|
while_starting_do_nothing(std::atomic<session::state_e> &state) {
|
|
while (state.load(std::memory_order_acquire) == session::state_e::STARTING) {
|
|
std::this_thread::sleep_for(1ms);
|
|
}
|
|
}
|
|
|
|
class control_server_t {
|
|
public:
|
|
int
|
|
bind(net::af_e address_family, std::uint16_t port) {
|
|
_host = net::host_create(address_family, _addr, config::stream.channels, port);
|
|
|
|
return !(bool) _host;
|
|
}
|
|
|
|
// Get session associated with address.
|
|
// If none are found, try to find a session not yet claimed. (It will be marked by a port of value 0
|
|
// If none of those are found, return nullptr
|
|
session_t *
|
|
get_session(const net::peer_t peer, uint32_t connect_data);
|
|
|
|
// Circular dependency:
|
|
// iterate refers to session
|
|
// session refers to broadcast_ctx_t
|
|
// broadcast_ctx_t refers to control_server_t
|
|
// Therefore, iterate is implemented further down the source file
|
|
void
|
|
iterate(std::chrono::milliseconds timeout);
|
|
|
|
/**
|
|
* @brief Calls the handler for a given control stream message.
|
|
* @param type The message type.
|
|
* @param session The session the message was received on.
|
|
* @param payload The payload of the message.
|
|
* @param reinjected `true` if this message is being reprocessed after decryption.
|
|
*/
|
|
void
|
|
call(std::uint16_t type, session_t *session, const std::string_view &payload, bool reinjected);
|
|
|
|
void
|
|
map(uint16_t type, std::function<void(session_t *, const std::string_view &)> cb) {
|
|
_map_type_cb.emplace(type, std::move(cb));
|
|
}
|
|
|
|
int
|
|
send(const std::string_view &payload, net::peer_t peer) {
|
|
auto packet = enet_packet_create(payload.data(), payload.size(), ENET_PACKET_FLAG_RELIABLE);
|
|
if (enet_peer_send(peer, 0, packet)) {
|
|
enet_packet_destroy(packet);
|
|
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
flush() {
|
|
enet_host_flush(_host.get());
|
|
}
|
|
|
|
// Callbacks
|
|
std::unordered_map<std::uint16_t, std::function<void(session_t *, const std::string_view &)>> _map_type_cb;
|
|
|
|
// All active sessions (including those still waiting for a peer to connect)
|
|
sync_util::sync_t<std::vector<session_t *>> _sessions;
|
|
|
|
// ENet peer to session mapping for sessions with a peer connected
|
|
sync_util::sync_t<std::map<net::peer_t, session_t *>> _peer_to_session;
|
|
|
|
ENetAddress _addr;
|
|
net::host_t _host;
|
|
};
|
|
|
|
struct broadcast_ctx_t {
|
|
message_queue_queue_t message_queue_queue;
|
|
|
|
std::thread recv_thread;
|
|
std::thread video_thread;
|
|
std::thread audio_thread;
|
|
std::thread control_thread;
|
|
|
|
asio::io_service io;
|
|
|
|
udp::socket video_sock { io };
|
|
udp::socket audio_sock { io };
|
|
|
|
control_server_t control_server;
|
|
};
|
|
|
|
struct session_t {
|
|
config_t config;
|
|
|
|
safe::mail_t mail;
|
|
|
|
std::shared_ptr<input::input_t> input;
|
|
|
|
std::thread audioThread;
|
|
std::thread videoThread;
|
|
|
|
std::chrono::steady_clock::time_point pingTimeout;
|
|
|
|
safe::shared_t<broadcast_ctx_t>::ptr_t broadcast_ref;
|
|
|
|
boost::asio::ip::address localAddress;
|
|
|
|
struct {
|
|
std::string ping_payload;
|
|
|
|
int lowseq;
|
|
udp::endpoint peer;
|
|
|
|
std::optional<crypto::cipher::gcm_t> cipher;
|
|
std::uint64_t gcm_iv_counter;
|
|
|
|
safe::mail_raw_t::event_t<bool> idr_events;
|
|
safe::mail_raw_t::event_t<std::pair<int64_t, int64_t>> invalidate_ref_frames_events;
|
|
|
|
std::unique_ptr<platf::deinit_t> qos;
|
|
} video;
|
|
|
|
struct {
|
|
crypto::cipher::cbc_t cipher;
|
|
std::string ping_payload;
|
|
|
|
std::uint16_t sequenceNumber;
|
|
// avRiKeyId == util::endian::big(First (sizeof(avRiKeyId)) bytes of launch_session->iv)
|
|
std::uint32_t avRiKeyId;
|
|
std::uint32_t timestamp;
|
|
udp::endpoint peer;
|
|
|
|
util::buffer_t<char> shards;
|
|
util::buffer_t<uint8_t *> shards_p;
|
|
|
|
audio_fec_packet_t fec_packet;
|
|
std::unique_ptr<platf::deinit_t> qos;
|
|
} audio;
|
|
|
|
struct {
|
|
crypto::cipher::gcm_t cipher;
|
|
crypto::aes_t legacy_input_enc_iv; // Only used when the client doesn't support full control stream encryption
|
|
crypto::aes_t incoming_iv;
|
|
crypto::aes_t outgoing_iv;
|
|
|
|
std::uint32_t connect_data; // Used for new clients with ML_FF_SESSION_ID_V1
|
|
std::string expected_peer_address; // Only used for legacy clients without ML_FF_SESSION_ID_V1
|
|
|
|
net::peer_t peer;
|
|
std::uint32_t seq;
|
|
|
|
platf::feedback_queue_t feedback_queue;
|
|
safe::mail_raw_t::event_t<video::hdr_info_t> hdr_queue;
|
|
} control;
|
|
|
|
safe::mail_raw_t::event_t<bool> shutdown_event;
|
|
safe::signal_t controlEnd;
|
|
|
|
std::atomic<session::state_e> state;
|
|
};
|
|
|
|
/**
|
|
* First part of cipher must be struct of type control_encrypted_t
|
|
*
|
|
* returns empty string_view on failure
|
|
* returns string_view pointing to payload data
|
|
*/
|
|
template <std::size_t max_payload_size>
|
|
static inline std::string_view
|
|
encode_control(session_t *session, const std::string_view &plaintext, std::array<std::uint8_t, max_payload_size> &tagged_cipher) {
|
|
static_assert(
|
|
max_payload_size >= sizeof(control_encrypted_t) + sizeof(crypto::cipher::tag_size),
|
|
"max_payload_size >= sizeof(control_encrypted_t) + sizeof(crypto::cipher::tag_size)");
|
|
|
|
if (session->config.controlProtocolType != 13) {
|
|
return plaintext;
|
|
}
|
|
|
|
auto seq = session->control.seq++;
|
|
|
|
auto &iv = session->control.outgoing_iv;
|
|
if (session->config.encryptionFlagsEnabled & SS_ENC_CONTROL_V2) {
|
|
// We use the deterministic IV construction algorithm specified in NIST SP 800-38D
|
|
// Section 8.2.1. The sequence number is our "invocation" field and the 'CH' in the
|
|
// high bytes is the "fixed" field. Because each client provides their own unique
|
|
// key, our values in the fixed field need only uniquely identify each independent
|
|
// use of the client's key with AES-GCM in our code.
|
|
//
|
|
// The sequence number is 32 bits long which allows for 2^32 control stream messages
|
|
// to be sent to each client before the IV repeats.
|
|
iv.resize(12);
|
|
std::copy_n((uint8_t *) &seq, sizeof(seq), std::begin(iv));
|
|
iv[10] = 'H'; // Host originated
|
|
iv[11] = 'C'; // Control stream
|
|
}
|
|
else {
|
|
// Nvidia's old style encryption uses a 16-byte IV
|
|
iv.resize(16);
|
|
|
|
iv[0] = (std::uint8_t) seq;
|
|
}
|
|
|
|
auto packet = (control_encrypted_p) tagged_cipher.data();
|
|
|
|
auto bytes = session->control.cipher.encrypt(plaintext, packet->payload(), &iv);
|
|
if (bytes <= 0) {
|
|
BOOST_LOG(error) << "Couldn't encrypt control data"sv;
|
|
return {};
|
|
}
|
|
|
|
std::uint16_t packet_length = bytes + crypto::cipher::tag_size + sizeof(control_encrypted_t::seq);
|
|
|
|
packet->encryptedHeaderType = util::endian::little(0x0001);
|
|
packet->length = util::endian::little(packet_length);
|
|
packet->seq = util::endian::little(seq);
|
|
|
|
return std::string_view { (char *) tagged_cipher.data(), packet_length + sizeof(control_encrypted_t) - sizeof(control_encrypted_t::seq) };
|
|
}
|
|
|
|
int
|
|
start_broadcast(broadcast_ctx_t &ctx);
|
|
void
|
|
end_broadcast(broadcast_ctx_t &ctx);
|
|
|
|
static auto broadcast = safe::make_shared<broadcast_ctx_t>(start_broadcast, end_broadcast);
|
|
|
|
session_t *
|
|
control_server_t::get_session(const net::peer_t peer, uint32_t connect_data) {
|
|
{
|
|
// Fast path - look up existing session by peer
|
|
auto lg = _peer_to_session.lock();
|
|
auto it = _peer_to_session->find(peer);
|
|
if (it != _peer_to_session->end()) {
|
|
return it->second;
|
|
}
|
|
}
|
|
|
|
// Slow path - process new session
|
|
TUPLE_2D(peer_port, peer_addr, platf::from_sockaddr_ex((sockaddr *) &peer->address.address));
|
|
auto lg = _sessions.lock();
|
|
for (auto pos = std::begin(*_sessions); pos != std::end(*_sessions); ++pos) {
|
|
auto session_p = *pos;
|
|
|
|
// Skip sessions that are already established
|
|
if (session_p->control.peer) {
|
|
continue;
|
|
}
|
|
|
|
// Identify the connection by the unique connect data if the client supports it.
|
|
// Only fall back to IP address matching for clients without session ID support.
|
|
if (session_p->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) {
|
|
if (session_p->control.connect_data != connect_data) {
|
|
continue;
|
|
}
|
|
else {
|
|
BOOST_LOG(debug) << "Initialized new control stream session by connect data match [v2]"sv;
|
|
}
|
|
}
|
|
else {
|
|
if (session_p->control.expected_peer_address != peer_addr) {
|
|
continue;
|
|
}
|
|
else {
|
|
BOOST_LOG(debug) << "Initialized new control stream session by IP address match [v1]"sv;
|
|
}
|
|
}
|
|
|
|
session_p->control.peer = peer;
|
|
|
|
// Use the local address from the control connection as the source address
|
|
// for other communications to the client. This is necessary to ensure
|
|
// proper routing on multi-homed hosts.
|
|
auto local_address = platf::from_sockaddr((sockaddr *) &peer->localAddress.address);
|
|
session_p->localAddress = boost::asio::ip::make_address(local_address);
|
|
|
|
BOOST_LOG(debug) << "Control local address ["sv << local_address << ']';
|
|
BOOST_LOG(debug) << "Control peer address ["sv << peer_addr << ':' << peer_port << ']';
|
|
|
|
// Insert this into the map for O(1) lookups in the future
|
|
auto ptslg = _peer_to_session.lock();
|
|
_peer_to_session->emplace(peer, session_p);
|
|
return session_p;
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
/**
|
|
* @brief Calls the handler for a given control stream message.
|
|
* @param type The message type.
|
|
* @param session The session the message was received on.
|
|
* @param payload The payload of the message.
|
|
* @param reinjected `true` if this message is being reprocessed after decryption.
|
|
*/
|
|
void
|
|
control_server_t::call(std::uint16_t type, session_t *session, const std::string_view &payload, bool reinjected) {
|
|
// If we are using the encrypted control stream protocol, drop any messages that come off the wire unencrypted
|
|
if (session->config.controlProtocolType == 13 && !reinjected && type != packetTypes[IDX_ENCRYPTED]) {
|
|
BOOST_LOG(error) << "Dropping unencrypted message on encrypted control stream: "sv << util::hex(type).to_string_view();
|
|
return;
|
|
}
|
|
|
|
auto cb = _map_type_cb.find(type);
|
|
if (cb == std::end(_map_type_cb)) {
|
|
BOOST_LOG(debug)
|
|
<< "type [Unknown] { "sv << util::hex(type).to_string_view() << " }"sv << std::endl
|
|
<< "---data---"sv << std::endl
|
|
<< util::hex_vec(payload) << std::endl
|
|
<< "---end data---"sv;
|
|
}
|
|
else {
|
|
cb->second(session, payload);
|
|
}
|
|
}
|
|
|
|
void
|
|
control_server_t::iterate(std::chrono::milliseconds timeout) {
|
|
ENetEvent event;
|
|
auto res = enet_host_service(_host.get(), &event, timeout.count());
|
|
|
|
if (res > 0) {
|
|
auto session = get_session(event.peer, event.data);
|
|
if (!session) {
|
|
BOOST_LOG(warning) << "Rejected connection from ["sv << platf::from_sockaddr((sockaddr *) &event.peer->address.address) << "]: it's not properly set up"sv;
|
|
enet_peer_disconnect_now(event.peer, 0);
|
|
|
|
return;
|
|
}
|
|
|
|
session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
|
|
|
|
switch (event.type) {
|
|
case ENET_EVENT_TYPE_RECEIVE: {
|
|
net::packet_t packet { event.packet };
|
|
|
|
auto type = *(std::uint16_t *) packet->data;
|
|
std::string_view payload { (char *) packet->data + sizeof(type), packet->dataLength - sizeof(type) };
|
|
|
|
call(type, session, payload, false);
|
|
} break;
|
|
case ENET_EVENT_TYPE_CONNECT:
|
|
BOOST_LOG(info) << "CLIENT CONNECTED"sv;
|
|
break;
|
|
case ENET_EVENT_TYPE_DISCONNECT:
|
|
BOOST_LOG(info) << "CLIENT DISCONNECTED"sv;
|
|
// No more clients to send video data to ^_^
|
|
if (session->state == session::state_e::RUNNING) {
|
|
session::stop(*session);
|
|
}
|
|
break;
|
|
case ENET_EVENT_TYPE_NONE:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
namespace fec {
|
|
using rs_t = util::safe_ptr<reed_solomon, reed_solomon_release>;
|
|
|
|
struct fec_t {
|
|
size_t data_shards;
|
|
size_t nr_shards;
|
|
size_t percentage;
|
|
|
|
size_t blocksize;
|
|
size_t prefixsize;
|
|
util::buffer_t<char> shards;
|
|
|
|
char *
|
|
data(size_t el) {
|
|
return &shards[(el + 1) * prefixsize + el * blocksize];
|
|
}
|
|
|
|
char *
|
|
prefix(size_t el) {
|
|
return &shards[el * (prefixsize + blocksize)];
|
|
}
|
|
|
|
size_t
|
|
size() const {
|
|
return nr_shards;
|
|
}
|
|
};
|
|
|
|
static fec_t
|
|
encode(const std::string_view &payload, size_t blocksize, size_t fecpercentage, size_t minparityshards, size_t prefixsize) {
|
|
auto payload_size = payload.size();
|
|
|
|
auto pad = payload_size % blocksize != 0;
|
|
|
|
auto data_shards = payload_size / blocksize + (pad ? 1 : 0);
|
|
auto parity_shards = (data_shards * fecpercentage + 99) / 100;
|
|
|
|
// increase the FEC percentage for this frame if the parity shard minimum is not met
|
|
if (parity_shards < minparityshards) {
|
|
parity_shards = minparityshards;
|
|
fecpercentage = (100 * parity_shards) / data_shards;
|
|
|
|
BOOST_LOG(verbose) << "Increasing FEC percentage to "sv << fecpercentage << " to meet parity shard minimum"sv << std::endl;
|
|
}
|
|
|
|
auto nr_shards = data_shards + parity_shards;
|
|
if (nr_shards > DATA_SHARDS_MAX) {
|
|
BOOST_LOG(warning)
|
|
<< "Number of fragments for reed solomon exceeds DATA_SHARDS_MAX"sv << std::endl
|
|
<< nr_shards << " > "sv << DATA_SHARDS_MAX
|
|
<< ", skipping error correction"sv;
|
|
|
|
nr_shards = data_shards;
|
|
fecpercentage = 0;
|
|
}
|
|
|
|
util::buffer_t<char> shards { nr_shards * (blocksize + prefixsize) };
|
|
util::buffer_t<uint8_t *> shards_p { nr_shards };
|
|
|
|
auto next = std::begin(payload);
|
|
for (auto x = 0; x < nr_shards; ++x) {
|
|
shards_p[x] = (uint8_t *) &shards[(x + 1) * prefixsize + x * blocksize];
|
|
|
|
auto copy_len = std::min<size_t>(blocksize, std::end(payload) - next);
|
|
std::copy_n(next, copy_len, shards_p[x]);
|
|
if (copy_len < blocksize) {
|
|
// Zero any additional space after the end of the payload
|
|
std::fill_n(shards_p[x] + copy_len, blocksize - copy_len, 0);
|
|
}
|
|
|
|
next += copy_len;
|
|
}
|
|
|
|
if (data_shards + parity_shards <= DATA_SHARDS_MAX) {
|
|
// packets = parity_shards + data_shards
|
|
rs_t rs { reed_solomon_new(data_shards, parity_shards) };
|
|
|
|
reed_solomon_encode(rs.get(), shards_p.begin(), nr_shards, blocksize);
|
|
}
|
|
|
|
return {
|
|
data_shards,
|
|
nr_shards,
|
|
fecpercentage,
|
|
blocksize,
|
|
prefixsize,
|
|
std::move(shards)
|
|
};
|
|
}
|
|
} // namespace fec
|
|
|
|
template <class F>
|
|
std::vector<uint8_t>
|
|
insert(uint64_t insert_size, uint64_t slice_size, const std::string_view &data, F &&f) {
|
|
auto pad = data.size() % slice_size != 0;
|
|
auto elements = data.size() / slice_size + (pad ? 1 : 0);
|
|
|
|
std::vector<uint8_t> result;
|
|
result.resize(elements * insert_size + data.size());
|
|
|
|
auto next = std::begin(data);
|
|
for (auto x = 0; x < elements - 1; ++x) {
|
|
void *p = &result[x * (insert_size + slice_size)];
|
|
|
|
f(p, x, elements);
|
|
|
|
std::copy(next, next + slice_size, (char *) p + insert_size);
|
|
next += slice_size;
|
|
}
|
|
|
|
auto x = elements - 1;
|
|
void *p = &result[x * (insert_size + slice_size)];
|
|
|
|
f(p, x, elements);
|
|
|
|
std::copy(next, std::end(data), (char *) p + insert_size);
|
|
|
|
return result;
|
|
}
|
|
|
|
std::vector<uint8_t>
|
|
replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) {
|
|
std::vector<uint8_t> replaced;
|
|
|
|
auto begin = std::begin(original);
|
|
auto end = std::end(original);
|
|
auto next = std::search(begin, end, std::begin(old), std::end(old));
|
|
|
|
std::copy(begin, next, std::back_inserter(replaced));
|
|
if (next != end) {
|
|
std::copy(std::begin(_new), std::end(_new), std::back_inserter(replaced));
|
|
std::copy(next + old.size(), end, std::back_inserter(replaced));
|
|
}
|
|
|
|
return replaced;
|
|
}
|
|
|
|
/**
|
|
* @brief Pass gamepad feedback data back to the client.
|
|
* @param session The session object.
|
|
* @param msg The message to pass.
|
|
* @return 0 on success.
|
|
*/
|
|
int
|
|
send_feedback_msg(session_t *session, platf::gamepad_feedback_msg_t &msg) {
|
|
if (!session->control.peer) {
|
|
BOOST_LOG(warning) << "Couldn't send gamepad feedback data, still waiting for PING from Moonlight"sv;
|
|
// Still waiting for PING from Moonlight
|
|
return -1;
|
|
}
|
|
|
|
std::string payload;
|
|
if (msg.type == platf::gamepad_feedback_e::rumble) {
|
|
control_rumble_t plaintext;
|
|
plaintext.header.type = packetTypes[IDX_RUMBLE_DATA];
|
|
plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2);
|
|
|
|
auto &data = msg.data.rumble;
|
|
|
|
plaintext.useless = 0xC0FFEE;
|
|
plaintext.id = util::endian::little(msg.id);
|
|
plaintext.lowfreq = util::endian::little(data.lowfreq);
|
|
plaintext.highfreq = util::endian::little(data.highfreq);
|
|
|
|
BOOST_LOG(verbose) << "Rumble: "sv << msg.id << " :: "sv << util::hex(data.lowfreq).to_string_view() << " :: "sv << util::hex(data.highfreq).to_string_view();
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
}
|
|
else if (msg.type == platf::gamepad_feedback_e::rumble_triggers) {
|
|
control_rumble_triggers_t plaintext;
|
|
plaintext.header.type = packetTypes[IDX_RUMBLE_TRIGGER_DATA];
|
|
plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2);
|
|
|
|
auto &data = msg.data.rumble_triggers;
|
|
|
|
plaintext.id = util::endian::little(msg.id);
|
|
plaintext.left = util::endian::little(data.left_trigger);
|
|
plaintext.right = util::endian::little(data.right_trigger);
|
|
|
|
BOOST_LOG(verbose) << "Rumble triggers: "sv << msg.id << " :: "sv << util::hex(data.left_trigger).to_string_view() << " :: "sv << util::hex(data.right_trigger).to_string_view();
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
}
|
|
else if (msg.type == platf::gamepad_feedback_e::set_motion_event_state) {
|
|
control_set_motion_event_t plaintext;
|
|
plaintext.header.type = packetTypes[IDX_SET_MOTION_EVENT];
|
|
plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2);
|
|
|
|
auto &data = msg.data.motion_event_state;
|
|
|
|
plaintext.id = util::endian::little(msg.id);
|
|
plaintext.reportrate = util::endian::little(data.report_rate);
|
|
plaintext.type = data.motion_type;
|
|
|
|
BOOST_LOG(verbose) << "Motion event state: "sv << msg.id << " :: "sv << util::hex(data.report_rate).to_string_view() << " :: "sv << util::hex(data.motion_type).to_string_view();
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
}
|
|
else if (msg.type == platf::gamepad_feedback_e::set_rgb_led) {
|
|
control_set_rgb_led_t plaintext;
|
|
plaintext.header.type = packetTypes[IDX_SET_RGB_LED];
|
|
plaintext.header.payloadLength = sizeof(plaintext) - sizeof(control_header_v2);
|
|
|
|
auto &data = msg.data.rgb_led;
|
|
|
|
plaintext.id = util::endian::little(msg.id);
|
|
plaintext.r = data.r;
|
|
plaintext.g = data.g;
|
|
plaintext.b = data.b;
|
|
|
|
BOOST_LOG(verbose) << "RGB: "sv << msg.id << " :: "sv << util::hex(data.r).to_string_view() << util::hex(data.g).to_string_view() << util::hex(data.b).to_string_view();
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
}
|
|
else {
|
|
BOOST_LOG(error) << "Unknown gamepad feedback message type"sv;
|
|
return -1;
|
|
}
|
|
|
|
if (session->broadcast_ref->control_server.send(payload, session->control.peer)) {
|
|
TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address));
|
|
BOOST_LOG(warning) << "Couldn't send gamepad feedback to ["sv << addr << ':' << port << ']';
|
|
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
send_hdr_mode(session_t *session, video::hdr_info_t hdr_info) {
|
|
if (!session->control.peer) {
|
|
BOOST_LOG(warning) << "Couldn't send HDR mode, still waiting for PING from Moonlight"sv;
|
|
// Still waiting for PING from Moonlight
|
|
return -1;
|
|
}
|
|
|
|
control_hdr_mode_t plaintext {};
|
|
plaintext.header.type = packetTypes[IDX_HDR_MODE];
|
|
plaintext.header.payloadLength = sizeof(control_hdr_mode_t) - sizeof(control_header_v2);
|
|
|
|
plaintext.enabled = hdr_info->enabled;
|
|
plaintext.metadata = hdr_info->metadata;
|
|
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
auto payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
if (session->broadcast_ref->control_server.send(payload, session->control.peer)) {
|
|
TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address));
|
|
BOOST_LOG(warning) << "Couldn't send HDR mode to ["sv << addr << ':' << port << ']';
|
|
|
|
return -1;
|
|
}
|
|
|
|
BOOST_LOG(debug) << "Sent HDR mode: " << hdr_info->enabled;
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
controlBroadcastThread(control_server_t *server) {
|
|
server->map(packetTypes[IDX_PERIODIC_PING], [](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(verbose) << "type [IDX_START_A]"sv;
|
|
});
|
|
|
|
server->map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(debug) << "type [IDX_START_A]"sv;
|
|
});
|
|
|
|
server->map(packetTypes[IDX_START_B], [&](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(debug) << "type [IDX_START_B]"sv;
|
|
});
|
|
|
|
server->map(packetTypes[IDX_LOSS_STATS], [&](session_t *session, const std::string_view &payload) {
|
|
int32_t *stats = (int32_t *) payload.data();
|
|
auto count = stats[0];
|
|
std::chrono::milliseconds t { stats[1] };
|
|
|
|
auto lastGoodFrame = stats[3];
|
|
|
|
BOOST_LOG(verbose)
|
|
<< "type [IDX_LOSS_STATS]"sv << std::endl
|
|
<< "---begin stats---" << std::endl
|
|
<< "loss count since last report [" << count << ']' << std::endl
|
|
<< "time in milli since last report [" << t.count() << ']' << std::endl
|
|
<< "last good frame [" << lastGoodFrame << ']' << std::endl
|
|
<< "---end stats---";
|
|
});
|
|
|
|
server->map(packetTypes[IDX_REQUEST_IDR_FRAME], [&](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(debug) << "type [IDX_REQUEST_IDR_FRAME]"sv;
|
|
|
|
session->video.idr_events->raise(true);
|
|
});
|
|
|
|
server->map(packetTypes[IDX_INVALIDATE_REF_FRAMES], [&](session_t *session, const std::string_view &payload) {
|
|
auto frames = (std::int64_t *) payload.data();
|
|
auto firstFrame = frames[0];
|
|
auto lastFrame = frames[1];
|
|
|
|
BOOST_LOG(debug)
|
|
<< "type [IDX_INVALIDATE_REF_FRAMES]"sv << std::endl
|
|
<< "firstFrame [" << firstFrame << ']' << std::endl
|
|
<< "lastFrame [" << lastFrame << ']';
|
|
|
|
session->video.invalidate_ref_frames_events->raise(std::make_pair(firstFrame, lastFrame));
|
|
});
|
|
|
|
server->map(packetTypes[IDX_INPUT_DATA], [&](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(debug) << "type [IDX_INPUT_DATA]"sv;
|
|
|
|
auto tagged_cipher_length = util::endian::big(*(int32_t *) payload.data());
|
|
std::string_view tagged_cipher { payload.data() + sizeof(tagged_cipher_length), (size_t) tagged_cipher_length };
|
|
|
|
std::vector<uint8_t> plaintext;
|
|
|
|
auto &cipher = session->control.cipher;
|
|
auto &iv = session->control.legacy_input_enc_iv;
|
|
if (cipher.decrypt(tagged_cipher, plaintext, &iv)) {
|
|
// something went wrong :(
|
|
|
|
BOOST_LOG(error) << "Failed to verify tag"sv;
|
|
|
|
session::stop(*session);
|
|
return;
|
|
}
|
|
|
|
if (tagged_cipher_length >= 16 + iv.size()) {
|
|
std::copy(payload.end() - 16, payload.end(), std::begin(iv));
|
|
}
|
|
|
|
input::passthrough(session->input, std::move(plaintext));
|
|
});
|
|
|
|
server->map(packetTypes[IDX_ENCRYPTED], [server](session_t *session, const std::string_view &payload) {
|
|
BOOST_LOG(verbose) << "type [IDX_ENCRYPTED]"sv;
|
|
|
|
auto header = (control_encrypted_p) (payload.data() - 2);
|
|
|
|
auto length = util::endian::little(header->length);
|
|
auto seq = util::endian::little(header->seq);
|
|
|
|
if (length < (16 + 4 + 4)) {
|
|
BOOST_LOG(warning) << "Control: Runt packet"sv;
|
|
return;
|
|
}
|
|
|
|
auto tagged_cipher_length = length - 4;
|
|
std::string_view tagged_cipher { (char *) header->payload(), (size_t) tagged_cipher_length };
|
|
|
|
auto &cipher = session->control.cipher;
|
|
auto &iv = session->control.incoming_iv;
|
|
if (session->config.encryptionFlagsEnabled & SS_ENC_CONTROL_V2) {
|
|
// We use the deterministic IV construction algorithm specified in NIST SP 800-38D
|
|
// Section 8.2.1. The sequence number is our "invocation" field and the 'CC' in the
|
|
// high bytes is the "fixed" field. Because each client provides their own unique
|
|
// key, our values in the fixed field need only uniquely identify each independent
|
|
// use of the client's key with AES-GCM in our code.
|
|
//
|
|
// The sequence number is 32 bits long which allows for 2^32 control stream messages
|
|
// to be received from each client before the IV repeats.
|
|
iv.resize(12);
|
|
std::copy_n((uint8_t *) &seq, sizeof(seq), std::begin(iv));
|
|
iv[10] = 'C'; // Client originated
|
|
iv[11] = 'C'; // Control stream
|
|
}
|
|
else {
|
|
// Nvidia's old style encryption uses a 16-byte IV
|
|
iv.resize(16);
|
|
|
|
iv[0] = (std::uint8_t) seq;
|
|
}
|
|
|
|
std::vector<uint8_t> plaintext;
|
|
if (cipher.decrypt(tagged_cipher, plaintext, &iv)) {
|
|
// something went wrong :(
|
|
|
|
BOOST_LOG(error) << "Failed to verify tag"sv;
|
|
|
|
session::stop(*session);
|
|
return;
|
|
}
|
|
|
|
auto type = *(std::uint16_t *) plaintext.data();
|
|
std::string_view next_payload { (char *) plaintext.data() + 4, plaintext.size() - 4 };
|
|
|
|
if (type == packetTypes[IDX_ENCRYPTED]) {
|
|
BOOST_LOG(error) << "Bad packet type [IDX_ENCRYPTED] found"sv;
|
|
session::stop(*session);
|
|
return;
|
|
}
|
|
|
|
// IDX_INPUT_DATA callback will attempt to decrypt unencrypted data, therefore we need pass it directly
|
|
if (type == packetTypes[IDX_INPUT_DATA]) {
|
|
plaintext.erase(std::begin(plaintext), std::begin(plaintext) + 4);
|
|
input::passthrough(session->input, std::move(plaintext));
|
|
}
|
|
else {
|
|
server->call(type, session, next_payload, true);
|
|
}
|
|
});
|
|
|
|
// This thread handles latency-sensitive control messages
|
|
platf::adjust_thread_priority(platf::thread_priority_e::critical);
|
|
|
|
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
|
|
while (!shutdown_event->peek()) {
|
|
bool has_session_awaiting_peer = false;
|
|
|
|
{
|
|
auto lg = server->_sessions.lock();
|
|
|
|
auto now = std::chrono::steady_clock::now();
|
|
|
|
KITTY_WHILE_LOOP(auto pos = std::begin(*server->_sessions), pos != std::end(*server->_sessions), {
|
|
auto session = *pos;
|
|
|
|
if (now > session->pingTimeout) {
|
|
auto address = session->control.peer ? platf::from_sockaddr((sockaddr *) &session->control.peer->address.address) : session->control.expected_peer_address;
|
|
BOOST_LOG(info) << address << ": Ping Timeout"sv;
|
|
session::stop(*session);
|
|
}
|
|
|
|
if (session->state.load(std::memory_order_acquire) == session::state_e::STOPPING) {
|
|
pos = server->_sessions->erase(pos);
|
|
|
|
if (session->control.peer) {
|
|
{
|
|
auto ptslg = server->_peer_to_session.lock();
|
|
server->_peer_to_session->erase(session->control.peer);
|
|
}
|
|
|
|
enet_peer_disconnect_now(session->control.peer, 0);
|
|
}
|
|
|
|
session->controlEnd.raise(true);
|
|
continue;
|
|
}
|
|
|
|
// Remember if we have a session that's waiting for a peer to connect to the
|
|
// control stream. This ensures the clients are properly notified even when
|
|
// the app terminates before they finish connecting.
|
|
if (!session->control.peer) {
|
|
has_session_awaiting_peer = true;
|
|
}
|
|
else {
|
|
auto &feedback_queue = session->control.feedback_queue;
|
|
while (feedback_queue->peek()) {
|
|
auto feedback_msg = feedback_queue->pop();
|
|
|
|
send_feedback_msg(session, *feedback_msg);
|
|
}
|
|
|
|
auto &hdr_queue = session->control.hdr_queue;
|
|
while (session->control.peer && hdr_queue->peek()) {
|
|
auto hdr_info = hdr_queue->pop();
|
|
|
|
send_hdr_mode(session, std::move(hdr_info));
|
|
}
|
|
}
|
|
|
|
++pos;
|
|
})
|
|
}
|
|
|
|
// Don't break until any pending sessions either expire or connect
|
|
if (proc::proc.running() == 0 && !has_session_awaiting_peer) {
|
|
BOOST_LOG(info) << "Process terminated"sv;
|
|
break;
|
|
}
|
|
|
|
server->iterate(150ms);
|
|
}
|
|
|
|
// Let all remaining connections know the server is shutting down
|
|
// reason: graceful termination
|
|
std::uint32_t reason = 0x80030023;
|
|
|
|
control_terminate_t plaintext;
|
|
plaintext.header.type = packetTypes[IDX_TERMINATION];
|
|
plaintext.header.payloadLength = sizeof(plaintext.ec);
|
|
plaintext.ec = util::endian::big<uint32_t>(reason);
|
|
|
|
std::array<std::uint8_t,
|
|
sizeof(control_encrypted_t) + crypto::cipher::round_to_pkcs7_padded(sizeof(plaintext)) + crypto::cipher::tag_size>
|
|
encrypted_payload;
|
|
|
|
auto lg = server->_sessions.lock();
|
|
for (auto pos = std::begin(*server->_sessions); pos != std::end(*server->_sessions); ++pos) {
|
|
auto session = *pos;
|
|
|
|
// We may not have gotten far enough to have an ENet connection yet
|
|
if (session->control.peer) {
|
|
auto payload = encode_control(session, util::view(plaintext), encrypted_payload);
|
|
|
|
if (server->send(payload, session->control.peer)) {
|
|
TUPLE_2D(port, addr, platf::from_sockaddr_ex((sockaddr *) &session->control.peer->address.address));
|
|
BOOST_LOG(warning) << "Couldn't send termination code to ["sv << addr << ':' << port << ']';
|
|
}
|
|
}
|
|
|
|
session->shutdown_event->raise(true);
|
|
session->controlEnd.raise(true);
|
|
}
|
|
|
|
server->flush();
|
|
}
|
|
|
|
void
|
|
recvThread(broadcast_ctx_t &ctx) {
|
|
std::map<av_session_id_t, message_queue_t> peer_to_video_session;
|
|
std::map<av_session_id_t, message_queue_t> peer_to_audio_session;
|
|
|
|
auto &video_sock = ctx.video_sock;
|
|
auto &audio_sock = ctx.audio_sock;
|
|
|
|
auto &message_queue_queue = ctx.message_queue_queue;
|
|
auto broadcast_shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
|
|
|
|
auto &io = ctx.io;
|
|
|
|
udp::endpoint peer;
|
|
|
|
std::array<char, 2048> buf[2];
|
|
std::function<void(const boost::system::error_code, size_t)> recv_func[2];
|
|
|
|
auto populate_peer_to_session = [&]() {
|
|
while (message_queue_queue->peek()) {
|
|
auto message_queue_opt = message_queue_queue->pop();
|
|
TUPLE_3D_REF(socket_type, session_id, message_queue, *message_queue_opt);
|
|
|
|
switch (socket_type) {
|
|
case socket_e::video:
|
|
if (message_queue) {
|
|
peer_to_video_session.emplace(session_id, message_queue);
|
|
}
|
|
else {
|
|
peer_to_video_session.erase(session_id);
|
|
}
|
|
break;
|
|
case socket_e::audio:
|
|
if (message_queue) {
|
|
peer_to_audio_session.emplace(session_id, message_queue);
|
|
}
|
|
else {
|
|
peer_to_audio_session.erase(session_id);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
|
|
auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map<av_session_id_t, message_queue_t> &peer_to_session) {
|
|
recv_func[buf_elem] = [&, buf_elem](const boost::system::error_code &ec, size_t bytes) {
|
|
auto fg = util::fail_guard([&]() {
|
|
sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func[buf_elem]);
|
|
});
|
|
|
|
auto type_str = buf_elem ? "AUDIO"sv : "VIDEO"sv;
|
|
BOOST_LOG(verbose) << "Recv: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
|
|
|
populate_peer_to_session();
|
|
|
|
// No data, yet no error
|
|
if (ec == boost::system::errc::connection_refused || ec == boost::system::errc::connection_reset) {
|
|
return;
|
|
}
|
|
|
|
if (ec || !bytes) {
|
|
BOOST_LOG(error) << "Couldn't receive data from udp socket: "sv << ec.message();
|
|
return;
|
|
}
|
|
|
|
if (bytes == 4) {
|
|
// For legacy PING packets, find the matching session by address.
|
|
auto it = peer_to_session.find(peer.address());
|
|
if (it != std::end(peer_to_session)) {
|
|
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
|
it->second->raise(peer, std::string { buf[buf_elem].data(), bytes });
|
|
}
|
|
}
|
|
else if (bytes >= sizeof(SS_PING)) {
|
|
auto ping = (PSS_PING) buf[buf_elem].data();
|
|
|
|
// For new PING packets that include a client identifier, search by payload.
|
|
auto it = peer_to_session.find(std::string { ping->payload, sizeof(ping->payload) });
|
|
if (it != std::end(peer_to_session)) {
|
|
BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str;
|
|
it->second->raise(peer, std::string { buf[buf_elem].data(), bytes });
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
recv_func_init(video_sock, 0, peer_to_video_session);
|
|
recv_func_init(audio_sock, 1, peer_to_audio_session);
|
|
|
|
video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]);
|
|
audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]);
|
|
|
|
while (!broadcast_shutdown_event->peek()) {
|
|
io.run();
|
|
}
|
|
}
|
|
|
|
void
|
|
videoBroadcastThread(udp::socket &sock) {
|
|
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
|
|
auto packets = mail::man->queue<video::packet_t>(mail::video_packets);
|
|
auto timebase = boost::posix_time::microsec_clock::universal_time();
|
|
|
|
// Video traffic is sent on this thread
|
|
platf::adjust_thread_priority(platf::thread_priority_e::high);
|
|
|
|
stat_trackers::min_max_avg_tracker<uint16_t> frame_processing_latency_tracker;
|
|
crypto::aes_t iv(12);
|
|
|
|
while (auto packet = packets->pop()) {
|
|
if (shutdown_event->peek()) {
|
|
break;
|
|
}
|
|
|
|
auto session = (session_t *) packet->channel_data;
|
|
auto lowseq = session->video.lowseq;
|
|
|
|
std::string_view payload { (char *) packet->data(), packet->data_size() };
|
|
std::vector<uint8_t> payload_with_replacements;
|
|
|
|
// Apply replacements on the packet payload before performing any other operations.
|
|
// We need to know the final frame size to calculate the last packet size, and we
|
|
// must avoid matching replacements against the frame header or any other non-video
|
|
// part of the payload.
|
|
if (packet->is_idr() && packet->replacements) {
|
|
for (auto &replacement : *packet->replacements) {
|
|
auto frame_old = replacement.old;
|
|
auto frame_new = replacement._new;
|
|
|
|
payload_with_replacements = replace(payload, frame_old, frame_new);
|
|
payload = { (char *) payload_with_replacements.data(), payload_with_replacements.size() };
|
|
}
|
|
}
|
|
|
|
video_short_frame_header_t frame_header = {};
|
|
frame_header.headerType = 0x01; // Short header type
|
|
frame_header.frameType = packet->is_idr() ? 2 :
|
|
packet->after_ref_frame_invalidation ? 5 :
|
|
1;
|
|
frame_header.lastPayloadLen = (payload.size() + sizeof(frame_header)) % (session->config.packetsize - sizeof(NV_VIDEO_PACKET));
|
|
if (frame_header.lastPayloadLen == 0) {
|
|
frame_header.lastPayloadLen = session->config.packetsize - sizeof(NV_VIDEO_PACKET);
|
|
}
|
|
|
|
if (packet->frame_timestamp) {
|
|
auto duration_to_latency = [](const std::chrono::steady_clock::duration &duration) {
|
|
const auto duration_us = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
|
|
return (uint16_t) std::clamp<decltype(duration_us)>((duration_us + 50) / 100, 0, std::numeric_limits<uint16_t>::max());
|
|
};
|
|
|
|
uint16_t latency = duration_to_latency(std::chrono::steady_clock::now() - *packet->frame_timestamp);
|
|
|
|
if (config::sunshine.min_log_level <= 1) {
|
|
// Print frame processing latency stats to debug log every 20 seconds
|
|
auto print_info = [&](uint16_t min_latency, uint16_t max_latency, double avg_latency) {
|
|
auto f = stat_trackers::one_digit_after_decimal();
|
|
BOOST_LOG(debug) << "Frame processing latency (min/max/avg): " << f % (min_latency / 10.) << "ms/" << f % (max_latency / 10.) << "ms/" << f % (avg_latency / 10.) << "ms";
|
|
};
|
|
frame_processing_latency_tracker.collect_and_callback_on_interval(latency, print_info, 20s);
|
|
}
|
|
|
|
frame_header.frame_processing_latency = latency;
|
|
}
|
|
else {
|
|
frame_header.frame_processing_latency = 0;
|
|
}
|
|
|
|
std::vector<uint8_t> payload_new;
|
|
std::copy_n((uint8_t *) &frame_header, sizeof(frame_header), std::back_inserter(payload_new));
|
|
std::copy(std::begin(payload), std::end(payload), std::back_inserter(payload_new));
|
|
|
|
payload = { (char *) payload_new.data(), payload_new.size() };
|
|
|
|
// insert packet headers
|
|
auto blocksize = session->config.packetsize + MAX_RTP_HEADER_SIZE;
|
|
auto payload_blocksize = blocksize - sizeof(video_packet_raw_t);
|
|
|
|
auto fecPercentage = config::stream.fec_percentage;
|
|
|
|
payload_new = insert(sizeof(video_packet_raw_t), payload_blocksize,
|
|
payload, [&](void *p, int fecIndex, int end) {
|
|
video_packet_raw_t *video_packet = (video_packet_raw_t *) p;
|
|
|
|
video_packet->packet.flags = FLAG_CONTAINS_PIC_DATA;
|
|
});
|
|
|
|
payload = std::string_view { (char *) payload_new.data(), payload_new.size() };
|
|
|
|
// With a fecpercentage of 255, if payload_new is broken up into more than a 100 data_shards
|
|
// it will generate greater than DATA_SHARDS_MAX shards.
|
|
// Therefore, we start breaking the data up into three separate fec blocks.
|
|
auto multi_fec_threshold = 90 * blocksize;
|
|
|
|
// We can go up to 4 fec blocks, but 3 is plenty
|
|
constexpr auto MAX_FEC_BLOCKS = 3;
|
|
|
|
std::array<std::string_view, MAX_FEC_BLOCKS> fec_blocks;
|
|
decltype(fec_blocks)::iterator
|
|
fec_blocks_begin = std::begin(fec_blocks),
|
|
fec_blocks_end = std::begin(fec_blocks) + 1;
|
|
|
|
auto lastBlockIndex = 0;
|
|
if (payload.size() > multi_fec_threshold) {
|
|
BOOST_LOG(verbose) << "Generating multiple FEC blocks"sv;
|
|
|
|
// Align individual fec blocks to blocksize
|
|
auto unaligned_size = payload.size() / MAX_FEC_BLOCKS;
|
|
auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize;
|
|
|
|
// Break the data up into 3 blocks, each containing multiple complete video packets.
|
|
fec_blocks[0] = payload.substr(0, aligned_size);
|
|
fec_blocks[1] = payload.substr(aligned_size, aligned_size);
|
|
fec_blocks[2] = payload.substr(aligned_size * 2);
|
|
|
|
lastBlockIndex = 2 << 6;
|
|
fec_blocks_end = std::end(fec_blocks);
|
|
}
|
|
else {
|
|
BOOST_LOG(verbose) << "Generating single FEC block"sv;
|
|
fec_blocks[0] = payload;
|
|
}
|
|
|
|
try {
|
|
auto blockIndex = 0;
|
|
std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) {
|
|
auto packets = (current_payload.size() + (blocksize - 1)) / blocksize;
|
|
|
|
for (int x = 0; x < packets; ++x) {
|
|
auto *inspect = (video_packet_raw_t *) ¤t_payload[x * blocksize];
|
|
|
|
inspect->packet.frameIndex = packet->frame_index();
|
|
inspect->packet.streamPacketIndex = ((uint32_t) lowseq + x) << 8;
|
|
|
|
// Match multiFecFlags with Moonlight
|
|
inspect->packet.multiFecFlags = 0x10;
|
|
inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex;
|
|
|
|
if (x == 0) {
|
|
inspect->packet.flags |= FLAG_SOF;
|
|
}
|
|
|
|
if (x == packets - 1) {
|
|
inspect->packet.flags |= FLAG_EOF;
|
|
}
|
|
}
|
|
|
|
// If video encryption is enabled, we allocate space for the encryption header before each shard
|
|
auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets,
|
|
session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0);
|
|
|
|
// set FEC info now that we know for sure what our percentage will be for this frame
|
|
for (auto x = 0; x < shards.size(); ++x) {
|
|
auto *inspect = (video_packet_raw_t *) shards.data(x);
|
|
|
|
// RTP video timestamps use a 90 KHz clock
|
|
auto now = boost::posix_time::microsec_clock::universal_time();
|
|
auto timestamp = (now - timebase).total_microseconds() / (1000 / 90);
|
|
|
|
inspect->packet.fecInfo =
|
|
(x << 12 |
|
|
shards.data_shards << 22 |
|
|
shards.percentage << 4);
|
|
|
|
inspect->rtp.header = 0x80 | FLAG_EXTENSION;
|
|
inspect->rtp.sequenceNumber = util::endian::big<uint16_t>(lowseq + x);
|
|
inspect->rtp.timestamp = util::endian::big<uint32_t>(timestamp);
|
|
|
|
inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex;
|
|
inspect->packet.frameIndex = packet->frame_index();
|
|
|
|
// Encrypt this shard if video encryption is enabled
|
|
if (session->video.cipher) {
|
|
// We use the deterministic IV construction algorithm specified in NIST SP 800-38D
|
|
// Section 8.2.1. The sequence number is our "invocation" field and the 'V' in the
|
|
// high bytes is the "fixed" field. Because each client provides their own unique
|
|
// key, our values in the fixed field need only uniquely identify each independent
|
|
// use of the client's key with AES-GCM in our code.
|
|
//
|
|
// The IV counter is 64 bits long which allows for 2^64 encrypted video packets
|
|
// to be sent to each client before the IV repeats.
|
|
std::copy_n((uint8_t *) &session->video.gcm_iv_counter, sizeof(session->video.gcm_iv_counter), std::begin(iv));
|
|
iv[11] = 'V'; // Video stream
|
|
session->video.gcm_iv_counter++;
|
|
|
|
// Encrypt the target buffer in place
|
|
auto *prefix = (video_packet_enc_prefix_t *) shards.prefix(x);
|
|
prefix->frameNumber = packet->frame_index();
|
|
std::copy(std::begin(iv), std::end(iv), prefix->iv);
|
|
session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, prefix->tag, &iv);
|
|
}
|
|
}
|
|
|
|
auto peer_address = session->video.peer.address();
|
|
auto batch_info = platf::batched_send_info_t {
|
|
shards.shards.begin(),
|
|
shards.prefixsize + shards.blocksize,
|
|
shards.nr_shards,
|
|
(uintptr_t) sock.native_handle(),
|
|
peer_address,
|
|
session->video.peer.port(),
|
|
session->localAddress,
|
|
};
|
|
|
|
// Use a batched send if it's supported on this platform
|
|
if (!platf::send_batch(batch_info)) {
|
|
// Batched send is not available, so send each packet individually
|
|
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
|
|
for (auto x = 0; x < shards.size(); ++x) {
|
|
auto send_info = platf::send_info_t {
|
|
shards.prefix(x),
|
|
shards.prefixsize + shards.blocksize,
|
|
(uintptr_t) sock.native_handle(),
|
|
peer_address,
|
|
session->video.peer.port(),
|
|
session->localAddress,
|
|
};
|
|
|
|
platf::send(send_info);
|
|
}
|
|
}
|
|
|
|
if (packet->is_idr()) {
|
|
BOOST_LOG(verbose) << "Key Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv;
|
|
}
|
|
else {
|
|
BOOST_LOG(verbose) << "Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv << std::endl;
|
|
}
|
|
|
|
++blockIndex;
|
|
lowseq += shards.size();
|
|
});
|
|
|
|
session->video.lowseq = lowseq;
|
|
}
|
|
catch (const std::exception &e) {
|
|
BOOST_LOG(error) << "Broadcast video failed "sv << e.what();
|
|
std::this_thread::sleep_for(100ms);
|
|
}
|
|
}
|
|
|
|
shutdown_event->raise(true);
|
|
}
|
|
|
|
void
|
|
audioBroadcastThread(udp::socket &sock) {
|
|
auto shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
|
|
auto packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
|
|
|
|
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
|
|
|
|
audio_packet_t audio_packet { (audio_packet_raw_t *) malloc(sizeof(audio_packet_raw_t) + max_block_size) };
|
|
fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) };
|
|
crypto::aes_t iv(16);
|
|
|
|
// For unknown reasons, the RS parity matrix computed by our RS implementation
|
|
// doesn't match the one Nvidia uses for audio data. I'm not exactly sure why,
|
|
// but we can simply replace it with the matrix generated by OpenFEC which
|
|
// works correctly. This is possible because the data and FEC shard count is
|
|
// constant and known in advance.
|
|
const unsigned char parity[] = { 0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c };
|
|
memcpy(rs.get()->p, parity, sizeof(parity));
|
|
|
|
audio_packet->rtp.header = 0x80;
|
|
audio_packet->rtp.packetType = 97;
|
|
audio_packet->rtp.ssrc = 0;
|
|
|
|
// Audio traffic is sent on this thread
|
|
platf::adjust_thread_priority(platf::thread_priority_e::high);
|
|
|
|
while (auto packet = packets->pop()) {
|
|
if (shutdown_event->peek()) {
|
|
break;
|
|
}
|
|
|
|
TUPLE_2D_REF(channel_data, packet_data, *packet);
|
|
auto session = (session_t *) channel_data;
|
|
|
|
auto sequenceNumber = session->audio.sequenceNumber;
|
|
auto timestamp = session->audio.timestamp;
|
|
|
|
*(std::uint32_t *) iv.data() = util::endian::big<std::uint32_t>(session->audio.avRiKeyId + sequenceNumber);
|
|
|
|
auto bytes = encode_audio(session->config.encryptionFlagsEnabled & SS_ENC_AUDIO, packet_data, audio_packet, iv, session->audio.cipher);
|
|
if (bytes < 0) {
|
|
BOOST_LOG(error) << "Couldn't encode audio packet"sv;
|
|
break;
|
|
}
|
|
|
|
audio_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber);
|
|
audio_packet->rtp.timestamp = util::endian::big(timestamp);
|
|
|
|
session->audio.sequenceNumber++;
|
|
session->audio.timestamp += session->config.audio.packetDuration;
|
|
|
|
auto &shards_p = session->audio.shards_p;
|
|
|
|
std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]);
|
|
auto peer_address = session->audio.peer.address();
|
|
try {
|
|
auto send_info = platf::send_info_t {
|
|
(const char *) audio_packet.get(),
|
|
sizeof(audio_packet_raw_t) + bytes,
|
|
(uintptr_t) sock.native_handle(),
|
|
peer_address,
|
|
session->audio.peer.port(),
|
|
session->localAddress,
|
|
};
|
|
platf::send(send_info);
|
|
BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv;
|
|
|
|
auto &fec_packet = session->audio.fec_packet;
|
|
// initialize the FEC header at the beginning of the FEC block
|
|
if (sequenceNumber % RTPA_DATA_SHARDS == 0) {
|
|
fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber);
|
|
fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp);
|
|
}
|
|
|
|
// generate parity shards at the end of the FEC block
|
|
if ((sequenceNumber + 1) % RTPA_DATA_SHARDS == 0) {
|
|
reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes);
|
|
|
|
for (auto x = 0; x < RTPA_FEC_SHARDS; ++x) {
|
|
fec_packet->rtp.sequenceNumber = util::endian::big<std::uint16_t>(sequenceNumber + x + 1);
|
|
fec_packet->fecHeader.fecShardIndex = x;
|
|
memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes);
|
|
|
|
auto send_info = platf::send_info_t {
|
|
(const char *) fec_packet.get(),
|
|
sizeof(audio_fec_packet_raw_t) + bytes,
|
|
(uintptr_t) sock.native_handle(),
|
|
peer_address,
|
|
session->audio.peer.port(),
|
|
session->localAddress,
|
|
};
|
|
platf::send(send_info);
|
|
BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv;
|
|
}
|
|
}
|
|
}
|
|
catch (const std::exception &e) {
|
|
BOOST_LOG(error) << "Broadcast audio failed "sv << e.what();
|
|
std::this_thread::sleep_for(100ms);
|
|
}
|
|
}
|
|
|
|
shutdown_event->raise(true);
|
|
}
|
|
|
|
int
|
|
start_broadcast(broadcast_ctx_t &ctx) {
|
|
auto address_family = net::af_from_enum_string(config::sunshine.address_family);
|
|
auto protocol = address_family == net::IPV4 ? udp::v4() : udp::v6();
|
|
auto control_port = map_port(CONTROL_PORT);
|
|
auto video_port = map_port(VIDEO_STREAM_PORT);
|
|
auto audio_port = map_port(AUDIO_STREAM_PORT);
|
|
|
|
if (ctx.control_server.bind(address_family, control_port)) {
|
|
BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << control_port << "], likely another process already bound to the port"sv;
|
|
|
|
return -1;
|
|
}
|
|
|
|
boost::system::error_code ec;
|
|
ctx.video_sock.open(protocol, ec);
|
|
if (ec) {
|
|
BOOST_LOG(fatal) << "Couldn't open socket for Video server: "sv << ec.message();
|
|
|
|
return -1;
|
|
}
|
|
|
|
ctx.video_sock.bind(udp::endpoint(protocol, video_port), ec);
|
|
if (ec) {
|
|
BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << video_port << "]: "sv << ec.message();
|
|
|
|
return -1;
|
|
}
|
|
|
|
ctx.audio_sock.open(protocol, ec);
|
|
if (ec) {
|
|
BOOST_LOG(fatal) << "Couldn't open socket for Audio server: "sv << ec.message();
|
|
|
|
return -1;
|
|
}
|
|
|
|
ctx.audio_sock.bind(udp::endpoint(protocol, audio_port), ec);
|
|
if (ec) {
|
|
BOOST_LOG(fatal) << "Couldn't bind Audio server to port ["sv << audio_port << "]: "sv << ec.message();
|
|
|
|
return -1;
|
|
}
|
|
|
|
ctx.message_queue_queue = std::make_shared<message_queue_queue_t::element_type>(30);
|
|
|
|
ctx.video_thread = std::thread { videoBroadcastThread, std::ref(ctx.video_sock) };
|
|
ctx.audio_thread = std::thread { audioBroadcastThread, std::ref(ctx.audio_sock) };
|
|
ctx.control_thread = std::thread { controlBroadcastThread, &ctx.control_server };
|
|
|
|
ctx.recv_thread = std::thread { recvThread, std::ref(ctx) };
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
end_broadcast(broadcast_ctx_t &ctx) {
|
|
auto broadcast_shutdown_event = mail::man->event<bool>(mail::broadcast_shutdown);
|
|
|
|
broadcast_shutdown_event->raise(true);
|
|
|
|
auto video_packets = mail::man->queue<video::packet_t>(mail::video_packets);
|
|
auto audio_packets = mail::man->queue<audio::packet_t>(mail::audio_packets);
|
|
|
|
// Minimize delay stopping video/audio threads
|
|
video_packets->stop();
|
|
audio_packets->stop();
|
|
|
|
ctx.message_queue_queue->stop();
|
|
ctx.io.stop();
|
|
|
|
ctx.video_sock.close();
|
|
ctx.audio_sock.close();
|
|
|
|
video_packets.reset();
|
|
audio_packets.reset();
|
|
|
|
BOOST_LOG(debug) << "Waiting for main listening thread to end..."sv;
|
|
ctx.recv_thread.join();
|
|
BOOST_LOG(debug) << "Waiting for main video thread to end..."sv;
|
|
ctx.video_thread.join();
|
|
BOOST_LOG(debug) << "Waiting for main audio thread to end..."sv;
|
|
ctx.audio_thread.join();
|
|
BOOST_LOG(debug) << "Waiting for main control thread to end..."sv;
|
|
ctx.control_thread.join();
|
|
BOOST_LOG(debug) << "All broadcasting threads ended"sv;
|
|
|
|
broadcast_shutdown_event->reset();
|
|
}
|
|
|
|
int
|
|
recv_ping(session_t *session, decltype(broadcast)::ptr_t ref, socket_e type, std::string_view expected_payload, udp::endpoint &peer, std::chrono::milliseconds timeout) {
|
|
auto messages = std::make_shared<message_queue_t::element_type>(30);
|
|
av_session_id_t session_id = std::string { expected_payload };
|
|
|
|
// Only allow matches on the peer address for legacy clients
|
|
if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) {
|
|
ref->message_queue_queue->raise(type, peer.address(), messages);
|
|
}
|
|
ref->message_queue_queue->raise(type, session_id, messages);
|
|
|
|
auto fg = util::fail_guard([&]() {
|
|
messages->stop();
|
|
|
|
// remove message queue from session
|
|
if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1)) {
|
|
ref->message_queue_queue->raise(type, peer.address(), nullptr);
|
|
}
|
|
ref->message_queue_queue->raise(type, session_id, nullptr);
|
|
});
|
|
|
|
auto start_time = std::chrono::steady_clock::now();
|
|
auto current_time = start_time;
|
|
|
|
while (current_time - start_time < config::stream.ping_timeout) {
|
|
auto delta_time = current_time - start_time;
|
|
|
|
auto msg_opt = messages->pop(config::stream.ping_timeout - delta_time);
|
|
if (!msg_opt) {
|
|
break;
|
|
}
|
|
|
|
TUPLE_2D_REF(recv_peer, msg, *msg_opt);
|
|
if (msg.find(expected_payload) != std::string::npos) {
|
|
// Match the new PING payload format
|
|
BOOST_LOG(debug) << "Received ping [v2] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
|
}
|
|
else if (!(session->config.mlFeatureFlags & ML_FF_SESSION_ID_V1) && msg == "PING"sv) {
|
|
// Match the legacy fixed PING payload only if the new type is not supported
|
|
BOOST_LOG(debug) << "Received ping [v1] from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
|
}
|
|
else {
|
|
BOOST_LOG(debug) << "Received non-ping from "sv << recv_peer.address() << ':' << recv_peer.port() << " ["sv << util::hex_vec(msg) << ']';
|
|
current_time = std::chrono::steady_clock::now();
|
|
continue;
|
|
}
|
|
|
|
// Update connection details.
|
|
peer = recv_peer;
|
|
return 0;
|
|
}
|
|
|
|
BOOST_LOG(error) << "Initial Ping Timeout"sv;
|
|
return -1;
|
|
}
|
|
|
|
void
|
|
videoThread(session_t *session) {
|
|
auto fg = util::fail_guard([&]() {
|
|
session::stop(*session);
|
|
});
|
|
|
|
while_starting_do_nothing(session->state);
|
|
|
|
auto ref = broadcast.ref();
|
|
auto error = recv_ping(session, ref, socket_e::video, session->video.ping_payload, session->video.peer, config::stream.ping_timeout);
|
|
if (error < 0) {
|
|
return;
|
|
}
|
|
|
|
// Enable QoS tagging on video traffic if requested by the client
|
|
if (session->config.videoQosType) {
|
|
auto address = session->video.peer.address();
|
|
session->video.qos = platf::enable_socket_qos(ref->video_sock.native_handle(), address,
|
|
session->video.peer.port(), platf::qos_data_type_e::video);
|
|
}
|
|
|
|
BOOST_LOG(debug) << "Start capturing Video"sv;
|
|
video::capture(session->mail, session->config.monitor, session);
|
|
}
|
|
|
|
void
|
|
audioThread(session_t *session) {
|
|
auto fg = util::fail_guard([&]() {
|
|
session::stop(*session);
|
|
});
|
|
|
|
while_starting_do_nothing(session->state);
|
|
|
|
auto ref = broadcast.ref();
|
|
auto error = recv_ping(session, ref, socket_e::audio, session->audio.ping_payload, session->audio.peer, config::stream.ping_timeout);
|
|
if (error < 0) {
|
|
return;
|
|
}
|
|
|
|
// Enable QoS tagging on audio traffic if requested by the client
|
|
if (session->config.audioQosType) {
|
|
auto address = session->audio.peer.address();
|
|
session->audio.qos = platf::enable_socket_qos(ref->audio_sock.native_handle(), address,
|
|
session->audio.peer.port(), platf::qos_data_type_e::audio);
|
|
}
|
|
|
|
BOOST_LOG(debug) << "Start capturing Audio"sv;
|
|
audio::capture(session->mail, session->config.audio, session);
|
|
}
|
|
|
|
namespace session {
|
|
std::atomic_uint running_sessions;
|
|
|
|
state_e
|
|
state(session_t &session) {
|
|
return session.state.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
void
|
|
stop(session_t &session) {
|
|
while_starting_do_nothing(session.state);
|
|
auto expected = state_e::RUNNING;
|
|
auto already_stopping = !session.state.compare_exchange_strong(expected, state_e::STOPPING);
|
|
if (already_stopping) {
|
|
return;
|
|
}
|
|
|
|
session.shutdown_event->raise(true);
|
|
}
|
|
|
|
void
|
|
join(session_t &session) {
|
|
// Current Nvidia drivers have a bug where NVENC can deadlock the encoder thread with hardware-accelerated
|
|
// GPU scheduling enabled. If this happens, we will terminate ourselves and the service can restart.
|
|
// The alternative is that Sunshine can never start another session until it's manually restarted.
|
|
auto task = []() {
|
|
BOOST_LOG(fatal) << "Hang detected! Session failed to terminate in 10 seconds."sv;
|
|
log_flush();
|
|
std::abort();
|
|
};
|
|
auto force_kill = task_pool.pushDelayed(task, 10s).task_id;
|
|
auto fg = util::fail_guard([&force_kill]() {
|
|
// Cancel the kill task if we manage to return from this function
|
|
task_pool.cancel(force_kill);
|
|
});
|
|
|
|
BOOST_LOG(debug) << "Waiting for video to end..."sv;
|
|
session.videoThread.join();
|
|
BOOST_LOG(debug) << "Waiting for audio to end..."sv;
|
|
session.audioThread.join();
|
|
BOOST_LOG(debug) << "Waiting for control to end..."sv;
|
|
session.controlEnd.view();
|
|
// Reset input on session stop to avoid stuck repeated keys
|
|
BOOST_LOG(debug) << "Resetting Input..."sv;
|
|
input::reset(session.input);
|
|
|
|
// If this is the last session, invoke the platform callbacks
|
|
if (--running_sessions == 0) {
|
|
#if defined SUNSHINE_TRAY && SUNSHINE_TRAY >= 1
|
|
if (proc::proc.running()) {
|
|
system_tray::update_tray_pausing(proc::proc.get_last_run_app_name());
|
|
}
|
|
#endif
|
|
platf::streaming_will_stop();
|
|
}
|
|
|
|
BOOST_LOG(debug) << "Session ended"sv;
|
|
}
|
|
|
|
int
|
|
start(session_t &session, const std::string &addr_string) {
|
|
session.input = input::alloc(session.mail);
|
|
|
|
session.broadcast_ref = broadcast.ref();
|
|
if (!session.broadcast_ref) {
|
|
return -1;
|
|
}
|
|
|
|
session.control.expected_peer_address = addr_string;
|
|
BOOST_LOG(debug) << "Expecting incoming session connections from "sv << addr_string;
|
|
|
|
// Insert this session into the session list
|
|
{
|
|
auto lg = session.broadcast_ref->control_server._sessions.lock();
|
|
session.broadcast_ref->control_server._sessions->push_back(&session);
|
|
}
|
|
|
|
auto addr = boost::asio::ip::make_address(addr_string);
|
|
session.video.peer.address(addr);
|
|
session.video.peer.port(0);
|
|
|
|
session.audio.peer.address(addr);
|
|
session.audio.peer.port(0);
|
|
|
|
session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
|
|
|
|
session.audioThread = std::thread { audioThread, &session };
|
|
session.videoThread = std::thread { videoThread, &session };
|
|
|
|
session.state.store(state_e::RUNNING, std::memory_order_relaxed);
|
|
|
|
// If this is the first session, invoke the platform callbacks
|
|
if (++running_sessions == 1) {
|
|
platf::streaming_will_start();
|
|
#if defined SUNSHINE_TRAY && SUNSHINE_TRAY >= 1
|
|
system_tray::update_tray_playing(proc::proc.get_last_run_app_name());
|
|
#endif
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
std::shared_ptr<session_t>
|
|
alloc(config_t &config, rtsp_stream::launch_session_t &launch_session) {
|
|
auto session = std::make_shared<session_t>();
|
|
|
|
auto mail = std::make_shared<safe::mail_raw_t>();
|
|
|
|
session->shutdown_event = mail->event<bool>(mail::shutdown);
|
|
|
|
session->config = config;
|
|
|
|
session->control.connect_data = launch_session.control_connect_data;
|
|
session->control.feedback_queue = mail->queue<platf::gamepad_feedback_msg_t>(mail::gamepad_feedback);
|
|
session->control.hdr_queue = mail->event<video::hdr_info_t>(mail::hdr);
|
|
session->control.legacy_input_enc_iv = launch_session.iv;
|
|
session->control.cipher = crypto::cipher::gcm_t {
|
|
launch_session.gcm_key, false
|
|
};
|
|
|
|
session->video.idr_events = mail->event<bool>(mail::idr);
|
|
session->video.invalidate_ref_frames_events = mail->event<std::pair<int64_t, int64_t>>(mail::invalidate_ref_frames);
|
|
session->video.lowseq = 0;
|
|
session->video.ping_payload = launch_session.av_ping_payload;
|
|
if (config.encryptionFlagsEnabled & SS_ENC_VIDEO) {
|
|
BOOST_LOG(info) << "Video encryption enabled"sv;
|
|
session->video.cipher = crypto::cipher::gcm_t {
|
|
launch_session.gcm_key, false
|
|
};
|
|
session->video.gcm_iv_counter = 0;
|
|
}
|
|
|
|
constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048);
|
|
|
|
util::buffer_t<char> shards { RTPA_TOTAL_SHARDS * max_block_size };
|
|
util::buffer_t<uint8_t *> shards_p { RTPA_TOTAL_SHARDS };
|
|
|
|
for (auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) {
|
|
shards_p[x] = (uint8_t *) &shards[x * max_block_size];
|
|
}
|
|
|
|
// Audio FEC spans multiple audio packets,
|
|
// therefore its session specific
|
|
session->audio.shards = std::move(shards);
|
|
session->audio.shards_p = std::move(shards_p);
|
|
|
|
session->audio.fec_packet.reset((audio_fec_packet_raw_t *) malloc(sizeof(audio_fec_packet_raw_t) + max_block_size));
|
|
|
|
session->audio.fec_packet->rtp.header = 0x80;
|
|
session->audio.fec_packet->rtp.packetType = 127;
|
|
session->audio.fec_packet->rtp.timestamp = 0;
|
|
session->audio.fec_packet->rtp.ssrc = 0;
|
|
|
|
session->audio.fec_packet->fecHeader.payloadType = 97;
|
|
session->audio.fec_packet->fecHeader.ssrc = 0;
|
|
|
|
session->audio.cipher = crypto::cipher::cbc_t {
|
|
launch_session.gcm_key, true
|
|
};
|
|
|
|
session->audio.ping_payload = launch_session.av_ping_payload;
|
|
session->audio.avRiKeyId = util::endian::big(*(std::uint32_t *) launch_session.iv.data());
|
|
session->audio.sequenceNumber = 0;
|
|
session->audio.timestamp = 0;
|
|
|
|
session->control.peer = nullptr;
|
|
session->state.store(state_e::STOPPED, std::memory_order_relaxed);
|
|
|
|
session->mail = std::move(mail);
|
|
|
|
return session;
|
|
}
|
|
} // namespace session
|
|
} // namespace stream
|