Refactor RTSP handling to be session-based rather than socket-based

This is required to support per-session attributes like encryption keys during RTSP message processing.
This commit is contained in:
Cameron Gutman 2024-02-02 20:05:56 -06:00
parent e62d6915db
commit ca29eac53a
6 changed files with 130 additions and 104 deletions

View File

@ -141,6 +141,7 @@ namespace nvhttp {
// uniqueID, session
std::unordered_map<std::string, pair_session_t> map_id_sess;
std::unordered_map<std::string, client_t> map_id_client;
std::atomic<uint32_t> session_id_counter;
using args_t = SimpleWeb::CaseInsensitiveMultimap;
using resp_https_t = std::shared_ptr<typename SimpleWeb::ServerBase<SimpleWeb::HTTPS>::Response>;
@ -267,41 +268,43 @@ namespace nvhttp {
}
}
rtsp_stream::launch_session_t
std::shared_ptr<rtsp_stream::launch_session_t>
make_launch_session(bool host_audio, const args_t &args) {
rtsp_stream::launch_session_t launch_session;
auto launch_session = std::make_shared<rtsp_stream::launch_session_t>();
launch_session->id = ++session_id_counter;
auto rikey = util::from_hex_vec(get_arg(args, "rikey"), true);
std::copy(rikey.cbegin(), rikey.cend(), std::back_inserter(launch_session.gcm_key));
std::copy(rikey.cbegin(), rikey.cend(), std::back_inserter(launch_session->gcm_key));
launch_session.host_audio = host_audio;
launch_session->host_audio = host_audio;
std::stringstream mode = std::stringstream(get_arg(args, "mode", "0x0x0"));
// Split mode by the char "x", to populate width/height/fps
int x = 0;
std::string segment;
while (std::getline(mode, segment, 'x')) {
if (x == 0) launch_session.width = atoi(segment.c_str());
if (x == 1) launch_session.height = atoi(segment.c_str());
if (x == 2) launch_session.fps = atoi(segment.c_str());
if (x == 0) launch_session->width = atoi(segment.c_str());
if (x == 1) launch_session->height = atoi(segment.c_str());
if (x == 2) launch_session->fps = atoi(segment.c_str());
x++;
}
launch_session.unique_id = (get_arg(args, "uniqueid", "unknown"));
launch_session.appid = util::from_view(get_arg(args, "appid", "unknown"));
launch_session.enable_sops = util::from_view(get_arg(args, "sops", "0"));
launch_session.surround_info = util::from_view(get_arg(args, "surroundAudioInfo", "196610"));
launch_session.gcmap = util::from_view(get_arg(args, "gcmap", "0"));
launch_session.enable_hdr = util::from_view(get_arg(args, "hdrMode", "0"));
launch_session->unique_id = (get_arg(args, "uniqueid", "unknown"));
launch_session->appid = util::from_view(get_arg(args, "appid", "unknown"));
launch_session->enable_sops = util::from_view(get_arg(args, "sops", "0"));
launch_session->surround_info = util::from_view(get_arg(args, "surroundAudioInfo", "196610"));
launch_session->gcmap = util::from_view(get_arg(args, "gcmap", "0"));
launch_session->enable_hdr = util::from_view(get_arg(args, "hdrMode", "0"));
// Generate the unique identifiers for this connection that we will send later during RTSP handshake
unsigned char raw_payload[8];
RAND_bytes(raw_payload, sizeof(raw_payload));
launch_session.av_ping_payload = util::hex_vec(raw_payload);
RAND_bytes((unsigned char *) &launch_session.control_connect_data, sizeof(launch_session.control_connect_data));
launch_session->av_ping_payload = util::hex_vec(raw_payload);
RAND_bytes((unsigned char *) &launch_session->control_connect_data, sizeof(launch_session->control_connect_data));
launch_session.iv.resize(16);
launch_session->iv.resize(16);
uint32_t prepend_iv = util::endian::big<uint32_t>(util::from_view(get_arg(args, "rikeyid")));
auto prepend_iv_p = (uint8_t *) &prepend_iv;
std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv));
std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session->iv));
return launch_session;
}

View File

@ -136,7 +136,7 @@ namespace proc {
}
int
proc_t::execute(int app_id, rtsp_stream::launch_session_t launch_session) {
proc_t::execute(int app_id, std::shared_ptr<rtsp_stream::launch_session_t> launch_session) {
// Ensure starting from a clean slate
terminate();
@ -157,14 +157,14 @@ namespace proc {
// Add Stream-specific environment variables
_env["SUNSHINE_APP_ID"] = std::to_string(_app_id);
_env["SUNSHINE_APP_NAME"] = _app.name;
_env["SUNSHINE_CLIENT_WIDTH"] = std::to_string(launch_session.width);
_env["SUNSHINE_CLIENT_HEIGHT"] = std::to_string(launch_session.height);
_env["SUNSHINE_CLIENT_FPS"] = std::to_string(launch_session.fps);
_env["SUNSHINE_CLIENT_HDR"] = launch_session.enable_hdr ? "true" : "false";
_env["SUNSHINE_CLIENT_GCMAP"] = std::to_string(launch_session.gcmap);
_env["SUNSHINE_CLIENT_HOST_AUDIO"] = launch_session.host_audio ? "true" : "false";
_env["SUNSHINE_CLIENT_ENABLE_SOPS"] = launch_session.enable_sops ? "true" : "false";
int channelCount = launch_session.surround_info & (65535);
_env["SUNSHINE_CLIENT_WIDTH"] = std::to_string(launch_session->width);
_env["SUNSHINE_CLIENT_HEIGHT"] = std::to_string(launch_session->height);
_env["SUNSHINE_CLIENT_FPS"] = std::to_string(launch_session->fps);
_env["SUNSHINE_CLIENT_HDR"] = launch_session->enable_hdr ? "true" : "false";
_env["SUNSHINE_CLIENT_GCMAP"] = std::to_string(launch_session->gcmap);
_env["SUNSHINE_CLIENT_HOST_AUDIO"] = launch_session->host_audio ? "true" : "false";
_env["SUNSHINE_CLIENT_ENABLE_SOPS"] = launch_session->enable_sops ? "true" : "false";
int channelCount = launch_session->surround_info & (65535);
switch (channelCount) {
case 2:
_env["SUNSHINE_CLIENT_AUDIO_CONFIGURATION"] = "2.0";

View File

@ -75,7 +75,7 @@ namespace proc {
_apps(std::move(apps)) {}
int
execute(int app_id, rtsp_stream::launch_session_t launch_session);
execute(int app_id, std::shared_ptr<rtsp_stream::launch_session_t> launch_session);
/**
* @return _app_id if a process is running, otherwise returns 0

View File

@ -44,18 +44,18 @@ namespace rtsp_stream {
class rtsp_server_t;
using msg_t = util::safe_ptr<RTSP_MESSAGE, free_msg>;
using cmd_func_t = std::function<void(rtsp_server_t *server, tcp::socket &, msg_t &&)>;
using cmd_func_t = std::function<void(rtsp_server_t *server, tcp::socket &, launch_session_t &, msg_t &&)>;
void
print_msg(PRTSP_MESSAGE msg);
void
cmd_not_found(tcp::socket &sock, msg_t &&req);
cmd_not_found(tcp::socket &sock, launch_session_t &, msg_t &&req);
void
respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload);
respond(tcp::socket &sock, launch_session_t &session, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload);
class socket_t: public std::enable_shared_from_this<socket_t> {
public:
socket_t(boost::asio::io_service &ios, std::function<void(tcp::socket &sock, msg_t &&)> &&handle_data_fn):
socket_t(boost::asio::io_service &ios, std::function<void(tcp::socket &sock, launch_session_t &, msg_t &&)> &&handle_data_fn):
handle_data_fn { std::move(handle_data_fn) }, sock { ios } {}
void
@ -63,7 +63,7 @@ namespace rtsp_stream {
if (begin == std::end(msg_buf)) {
BOOST_LOG(error) << "RTSP: read(): Exceeded maximum rtsp packet size: "sv << msg_buf.size();
respond(sock, nullptr, 400, "BAD REQUEST", 0, {});
respond(sock, *session, nullptr, 400, "BAD REQUEST", 0, {});
sock.close();
@ -83,7 +83,7 @@ namespace rtsp_stream {
if (begin == std::end(msg_buf)) {
BOOST_LOG(error) << "RTSP: read_payload(): Exceeded maximum rtsp packet size: "sv << msg_buf.size();
respond(sock, nullptr, 400, "BAD REQUEST", 0, {});
respond(sock, *session, nullptr, 400, "BAD REQUEST", 0, {});
sock.close();
@ -122,7 +122,7 @@ namespace rtsp_stream {
if (auto status = parseRtspMessage(req.get(), socket->msg_buf.data(), (std::size_t)(end - socket->msg_buf.data()))) {
BOOST_LOG(error) << "Malformed RTSP message: ["sv << status << ']';
respond(socket->sock, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {});
respond(socket->sock, *socket->session, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
@ -206,10 +206,10 @@ namespace rtsp_stream {
void
handle_data(msg_t &&req) {
handle_data_fn(sock, std::move(req));
handle_data_fn(sock, *session, std::move(req));
}
std::function<void(tcp::socket &sock, msg_t &&)> handle_data_fn;
std::function<void(tcp::socket &sock, launch_session_t &, msg_t &&)> handle_data_fn;
tcp::socket sock;
@ -217,6 +217,8 @@ namespace rtsp_stream {
char *crlf;
char *begin = msg_buf.data();
std::shared_ptr<launch_session_t> session;
};
class rtsp_server_t {
@ -251,8 +253,8 @@ namespace rtsp_stream {
return -1;
}
next_socket = std::make_shared<socket_t>(ios, [this](tcp::socket &sock, msg_t &&msg) {
handle_msg(sock, std::move(msg));
next_socket = std::make_shared<socket_t>(ios, [this](tcp::socket &sock, launch_session_t &session, msg_t &&msg) {
handle_msg(sock, session, std::move(msg));
});
acceptor.async_accept(next_socket->sock, [this](const auto &ec) {
@ -269,13 +271,13 @@ namespace rtsp_stream {
}
void
handle_msg(tcp::socket &sock, msg_t &&req) {
handle_msg(tcp::socket &sock, launch_session_t &session, msg_t &&req) {
auto func = _map_cmd_cb.find(req->message.request.command);
if (func != std::end(_map_cmd_cb)) {
func->second(this, sock, std::move(req));
func->second(this, sock, session, std::move(req));
}
else {
cmd_not_found(sock, std::move(req));
cmd_not_found(sock, session, std::move(req));
}
sock.shutdown(boost::asio::socket_base::shutdown_type::shutdown_both);
@ -291,12 +293,17 @@ namespace rtsp_stream {
return;
}
auto socket = std::move(next_socket);
socket->read();
auto launch_session { launch_event.view() };
if (launch_session) {
// Associate the current RTSP session with this socket and start reading
auto socket = std::move(next_socket);
socket->session = launch_session;
socket->read();
next_socket = std::make_shared<socket_t>(ios, [this](tcp::socket &sock, msg_t &&msg) {
handle_msg(sock, std::move(msg));
});
next_socket = std::make_shared<socket_t>(ios, [this](tcp::socket &sock, launch_session_t &session, msg_t &&msg) {
handle_msg(sock, session, std::move(msg));
});
}
acceptor.async_accept(next_socket->sock, [this](const auto &ec) {
handle_accept(ec);
@ -313,16 +320,9 @@ namespace rtsp_stream {
* @note If the client does not begin streaming within the ping_timeout,
* the session will be discarded.
* @param launch_session Streaming session information.
*
* EXAMPLES:
* ```cpp
* launch_session_t launch_session;
* rtsp_server_t server {};
* server.session_raise(launch_session);
* ```
*/
void
session_raise(rtsp_stream::launch_session_t launch_session) {
session_raise(std::shared_ptr<launch_session_t> launch_session) {
auto now = std::chrono::steady_clock::now();
// If a launch event is still pending, don't overwrite it.
@ -332,7 +332,26 @@ namespace rtsp_stream {
raised_timeout = now + config::stream.ping_timeout;
--_slot_count;
launch_event.raise(launch_session);
launch_event.raise(std::move(launch_session));
}
/**
* @brief Clear state for the oldest launch session.
* @param launch_session_id The ID of the session to clear.
*/
void
session_clear(uint32_t launch_session_id) {
// We currently only support a single pending RTSP session,
// so the ID should always match the one for that session.
auto launch_session = launch_event.view();
if (launch_session) {
if (launch_session->id != launch_session_id) {
BOOST_LOG(error) << "Attempted to clear unexpected session: "sv << launch_session_id << " vs "sv << launch_session->id;
}
else {
launch_event.pop();
}
}
}
int
@ -340,7 +359,7 @@ namespace rtsp_stream {
return config::stream.channels - _slot_count;
}
safe::event_t<rtsp_stream::launch_session_t> launch_event;
safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
/**
* @brief Clear launch sessions.
@ -420,8 +439,17 @@ namespace rtsp_stream {
rtsp_server_t server {};
void
launch_session_raise(rtsp_stream::launch_session_t launch_session) {
server.session_raise(launch_session);
launch_session_raise(std::shared_ptr<launch_session_t> launch_session) {
server.session_raise(std::move(launch_session));
}
/**
* @brief Clear state for the specified launch session.
* @param launch_session_id The ID of the session to clear.
*/
void
launch_session_clear(uint32_t launch_session_id) {
server.session_clear(launch_session_id);
}
int
@ -450,7 +478,7 @@ namespace rtsp_stream {
}
void
respond(tcp::socket &sock, msg_t &resp) {
respond(tcp::socket &sock, launch_session_t &session, msg_t &resp) {
auto payload = std::make_pair(resp->payload, resp->payloadLength);
// Restore response message for proper destruction
@ -480,20 +508,20 @@ namespace rtsp_stream {
}
void
respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) {
respond(tcp::socket &sock, launch_session_t &session, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) {
msg_t resp { new msg_t::element_type };
createRtspResponse(resp.get(), nullptr, 0, const_cast<char *>("RTSP/1.0"), statuscode, const_cast<char *>(status_msg), seqn, options, const_cast<char *>(payload.data()), (int) payload.size());
respond(sock, resp);
respond(sock, session, resp);
}
void
cmd_not_found(tcp::socket &sock, msg_t &&req) {
respond(sock, nullptr, 404, "NOT FOUND", req->sequenceNumber, {});
cmd_not_found(tcp::socket &sock, launch_session_t &session, msg_t &&req) {
respond(sock, session, nullptr, 404, "NOT FOUND", req->sequenceNumber, {});
}
void
cmd_option(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
cmd_option(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
@ -502,11 +530,11 @@ namespace rtsp_stream {
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char *>(seqn_str.c_str());
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
respond(sock, session, &option, 200, "OK", req->sequenceNumber, {});
}
void
cmd_describe(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
cmd_describe(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
@ -587,11 +615,11 @@ namespace rtsp_stream {
ss << std::endl;
}
respond(sock, &option, 200, "OK", req->sequenceNumber, ss.str());
respond(sock, session, &option, 200, "OK", req->sequenceNumber, ss.str());
}
void
cmd_setup(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
cmd_setup(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) {
OPTION_ITEM options[4] {};
auto &seqn = options[0];
@ -604,14 +632,6 @@ namespace rtsp_stream {
auto seqn_str = std::to_string(req->sequenceNumber);
seqn.content = const_cast<char *>(seqn_str.c_str());
if (!server->launch_event.peek()) {
// /launch has not been used
respond(sock, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
auto launch_session { server->launch_event.view() };
std::string_view target { req->message.request.target };
auto begin = std::find(std::begin(target), std::end(target), '=') + 1;
auto end = std::find(begin, std::end(target), '/');
@ -628,7 +648,7 @@ namespace rtsp_stream {
port = map_port(stream::CONTROL_PORT);
}
else {
cmd_not_found(sock, std::move(req));
cmd_not_found(sock, session, std::move(req));
return;
}
@ -647,23 +667,23 @@ namespace rtsp_stream {
port_option.content = port_value.data();
// Send identifiers that will be echoed in the other connections
auto connect_data = std::to_string(launch_session->control_connect_data);
auto connect_data = std::to_string(session.control_connect_data);
if (type == "control"sv) {
payload_option.option = const_cast<char *>("X-SS-Connect-Data");
payload_option.content = connect_data.data();
}
else {
payload_option.option = const_cast<char *>("X-SS-Ping-Payload");
payload_option.content = launch_session->av_ping_payload.data();
payload_option.content = session.av_ping_payload.data();
}
port_option.next = &payload_option;
respond(sock, &seqn, 200, "OK", req->sequenceNumber, {});
respond(sock, session, &seqn, 200, "OK", req->sequenceNumber, {});
}
void
cmd_announce(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
cmd_announce(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
@ -672,14 +692,6 @@ namespace rtsp_stream {
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char *>(seqn_str.c_str());
if (!server->launch_event.peek()) {
// /launch has not been used
respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
auto launch_session { server->launch_event.pop() };
std::string_view payload { req->payload, (size_t) req->payloadLength };
std::vector<std::string_view> lines;
@ -739,7 +751,7 @@ namespace rtsp_stream {
stream::config_t config;
std::int64_t configuredBitrateKbps;
config.audio.flags[audio::config_t::HOST_AUDIO] = launch_session->host_audio;
config.audio.flags[audio::config_t::HOST_AUDIO] = session.host_audio;
try {
config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv));
config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv));
@ -774,7 +786,7 @@ namespace rtsp_stream {
configuredBitrateKbps = util::from_view(args.at("x-ml-video.configuredBitrateKbps"sv));
}
catch (std::out_of_range &) {
respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
@ -820,14 +832,14 @@ namespace rtsp_stream {
if (config.monitor.videoFormat == 1 && video::active_hevc_mode == 1) {
BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv;
respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
if (config.monitor.videoFormat == 2 && video::active_av1_mode == 1) {
BOOST_LOG(warning) << "AV1 is disabled, yet the client requested AV1"sv;
respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
@ -844,33 +856,33 @@ namespace rtsp_stream {
(config.encryptionFlagsEnabled & (SS_ENC_VIDEO | SS_ENC_AUDIO)) != (SS_ENC_VIDEO | SS_ENC_AUDIO)) {
BOOST_LOG(error) << "Rejecting client that cannot comply with mandatory encryption requirement"sv;
respond(sock, &option, 403, "Forbidden", req->sequenceNumber, {});
respond(sock, session, &option, 403, "Forbidden", req->sequenceNumber, {});
return;
}
auto session = stream::session::alloc(config, *launch_session);
auto stream_session = stream::session::alloc(config, session);
auto slot = server->accept(session);
auto slot = server->accept(stream_session);
if (!slot) {
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';
respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {});
respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
if (stream::session::start(*session, sock.remote_endpoint().address().to_string())) {
if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
BOOST_LOG(error) << "Failed to start a streaming session"sv;
server->clear(slot);
respond(sock, &option, 500, "Internal Server Error", req->sequenceNumber, {});
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
return;
}
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
respond(sock, session, &option, 200, "OK", req->sequenceNumber, {});
}
void
cmd_play(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) {
cmd_play(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
@ -879,7 +891,7 @@ namespace rtsp_stream {
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char *>(seqn_str.c_str());
respond(sock, &option, 200, "OK", req->sequenceNumber, {});
respond(sock, session, &option, 200, "OK", req->sequenceNumber, {});
}
void
@ -891,7 +903,6 @@ namespace rtsp_stream {
server.map("DESCRIBE"sv, &cmd_describe);
server.map("SETUP"sv, &cmd_setup);
server.map("ANNOUNCE"sv, &cmd_announce);
server.map("PLAY"sv, &cmd_play);
boost::system::error_code ec;

View File

@ -13,6 +13,8 @@ namespace rtsp_stream {
constexpr auto RTSP_SETUP_PORT = 21;
struct launch_session_t {
uint32_t id;
crypto::aes_t gcm_key;
crypto::aes_t iv;
@ -32,7 +34,11 @@ namespace rtsp_stream {
};
void
launch_session_raise(launch_session_t launch_session);
launch_session_raise(std::shared_ptr<launch_session_t> launch_session);
void
launch_session_clear(uint32_t launch_session_id);
int
session_count();

View File

@ -411,6 +411,8 @@ namespace stream {
safe::mail_raw_t::event_t<video::hdr_info_t> hdr_queue;
} control;
std::uint32_t launch_session_id;
safe::mail_raw_t::event_t<bool> shutdown_event;
safe::signal_t controlEnd;
@ -523,6 +525,9 @@ namespace stream {
}
}
// Once the control stream connection is established, RTSP session state can be torn down
rtsp_stream::launch_session_clear(session_p->launch_session_id);
session_p->control.peer = peer;
// Use the local address from the control connection as the source address
@ -1881,6 +1886,7 @@ namespace stream {
auto mail = std::make_shared<safe::mail_raw_t>();
session->shutdown_event = mail->event<bool>(mail::shutdown);
session->launch_session_id = launch_session.id;
session->config = config;