diff --git a/src/nvhttp.cpp b/src/nvhttp.cpp index 037503c9..285df69d 100644 --- a/src/nvhttp.cpp +++ b/src/nvhttp.cpp @@ -141,6 +141,7 @@ namespace nvhttp { // uniqueID, session std::unordered_map map_id_sess; std::unordered_map map_id_client; + std::atomic session_id_counter; using args_t = SimpleWeb::CaseInsensitiveMultimap; using resp_https_t = std::shared_ptr::Response>; @@ -267,41 +268,43 @@ namespace nvhttp { } } - rtsp_stream::launch_session_t + std::shared_ptr make_launch_session(bool host_audio, const args_t &args) { - rtsp_stream::launch_session_t launch_session; + auto launch_session = std::make_shared(); + + launch_session->id = ++session_id_counter; auto rikey = util::from_hex_vec(get_arg(args, "rikey"), true); - std::copy(rikey.cbegin(), rikey.cend(), std::back_inserter(launch_session.gcm_key)); + std::copy(rikey.cbegin(), rikey.cend(), std::back_inserter(launch_session->gcm_key)); - launch_session.host_audio = host_audio; + launch_session->host_audio = host_audio; std::stringstream mode = std::stringstream(get_arg(args, "mode", "0x0x0")); // Split mode by the char "x", to populate width/height/fps int x = 0; std::string segment; while (std::getline(mode, segment, 'x')) { - if (x == 0) launch_session.width = atoi(segment.c_str()); - if (x == 1) launch_session.height = atoi(segment.c_str()); - if (x == 2) launch_session.fps = atoi(segment.c_str()); + if (x == 0) launch_session->width = atoi(segment.c_str()); + if (x == 1) launch_session->height = atoi(segment.c_str()); + if (x == 2) launch_session->fps = atoi(segment.c_str()); x++; } - launch_session.unique_id = (get_arg(args, "uniqueid", "unknown")); - launch_session.appid = util::from_view(get_arg(args, "appid", "unknown")); - launch_session.enable_sops = util::from_view(get_arg(args, "sops", "0")); - launch_session.surround_info = util::from_view(get_arg(args, "surroundAudioInfo", "196610")); - launch_session.gcmap = util::from_view(get_arg(args, "gcmap", "0")); - launch_session.enable_hdr = util::from_view(get_arg(args, "hdrMode", "0")); + launch_session->unique_id = (get_arg(args, "uniqueid", "unknown")); + launch_session->appid = util::from_view(get_arg(args, "appid", "unknown")); + launch_session->enable_sops = util::from_view(get_arg(args, "sops", "0")); + launch_session->surround_info = util::from_view(get_arg(args, "surroundAudioInfo", "196610")); + launch_session->gcmap = util::from_view(get_arg(args, "gcmap", "0")); + launch_session->enable_hdr = util::from_view(get_arg(args, "hdrMode", "0")); // Generate the unique identifiers for this connection that we will send later during RTSP handshake unsigned char raw_payload[8]; RAND_bytes(raw_payload, sizeof(raw_payload)); - launch_session.av_ping_payload = util::hex_vec(raw_payload); - RAND_bytes((unsigned char *) &launch_session.control_connect_data, sizeof(launch_session.control_connect_data)); + launch_session->av_ping_payload = util::hex_vec(raw_payload); + RAND_bytes((unsigned char *) &launch_session->control_connect_data, sizeof(launch_session->control_connect_data)); - launch_session.iv.resize(16); + launch_session->iv.resize(16); uint32_t prepend_iv = util::endian::big(util::from_view(get_arg(args, "rikeyid"))); auto prepend_iv_p = (uint8_t *) &prepend_iv; - std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session.iv)); + std::copy(prepend_iv_p, prepend_iv_p + sizeof(prepend_iv), std::begin(launch_session->iv)); return launch_session; } diff --git a/src/process.cpp b/src/process.cpp index 7042dd00..050c58ee 100644 --- a/src/process.cpp +++ b/src/process.cpp @@ -136,7 +136,7 @@ namespace proc { } int - proc_t::execute(int app_id, rtsp_stream::launch_session_t launch_session) { + proc_t::execute(int app_id, std::shared_ptr launch_session) { // Ensure starting from a clean slate terminate(); @@ -157,14 +157,14 @@ namespace proc { // Add Stream-specific environment variables _env["SUNSHINE_APP_ID"] = std::to_string(_app_id); _env["SUNSHINE_APP_NAME"] = _app.name; - _env["SUNSHINE_CLIENT_WIDTH"] = std::to_string(launch_session.width); - _env["SUNSHINE_CLIENT_HEIGHT"] = std::to_string(launch_session.height); - _env["SUNSHINE_CLIENT_FPS"] = std::to_string(launch_session.fps); - _env["SUNSHINE_CLIENT_HDR"] = launch_session.enable_hdr ? "true" : "false"; - _env["SUNSHINE_CLIENT_GCMAP"] = std::to_string(launch_session.gcmap); - _env["SUNSHINE_CLIENT_HOST_AUDIO"] = launch_session.host_audio ? "true" : "false"; - _env["SUNSHINE_CLIENT_ENABLE_SOPS"] = launch_session.enable_sops ? "true" : "false"; - int channelCount = launch_session.surround_info & (65535); + _env["SUNSHINE_CLIENT_WIDTH"] = std::to_string(launch_session->width); + _env["SUNSHINE_CLIENT_HEIGHT"] = std::to_string(launch_session->height); + _env["SUNSHINE_CLIENT_FPS"] = std::to_string(launch_session->fps); + _env["SUNSHINE_CLIENT_HDR"] = launch_session->enable_hdr ? "true" : "false"; + _env["SUNSHINE_CLIENT_GCMAP"] = std::to_string(launch_session->gcmap); + _env["SUNSHINE_CLIENT_HOST_AUDIO"] = launch_session->host_audio ? "true" : "false"; + _env["SUNSHINE_CLIENT_ENABLE_SOPS"] = launch_session->enable_sops ? "true" : "false"; + int channelCount = launch_session->surround_info & (65535); switch (channelCount) { case 2: _env["SUNSHINE_CLIENT_AUDIO_CONFIGURATION"] = "2.0"; diff --git a/src/process.h b/src/process.h index 433c7669..c8754992 100644 --- a/src/process.h +++ b/src/process.h @@ -75,7 +75,7 @@ namespace proc { _apps(std::move(apps)) {} int - execute(int app_id, rtsp_stream::launch_session_t launch_session); + execute(int app_id, std::shared_ptr launch_session); /** * @return _app_id if a process is running, otherwise returns 0 diff --git a/src/rtsp.cpp b/src/rtsp.cpp index e92e177a..5fa0e024 100644 --- a/src/rtsp.cpp +++ b/src/rtsp.cpp @@ -44,18 +44,18 @@ namespace rtsp_stream { class rtsp_server_t; using msg_t = util::safe_ptr; - using cmd_func_t = std::function; + using cmd_func_t = std::function; void print_msg(PRTSP_MESSAGE msg); void - cmd_not_found(tcp::socket &sock, msg_t &&req); + cmd_not_found(tcp::socket &sock, launch_session_t &, msg_t &&req); void - respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload); + respond(tcp::socket &sock, launch_session_t &session, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload); class socket_t: public std::enable_shared_from_this { public: - socket_t(boost::asio::io_service &ios, std::function &&handle_data_fn): + socket_t(boost::asio::io_service &ios, std::function &&handle_data_fn): handle_data_fn { std::move(handle_data_fn) }, sock { ios } {} void @@ -63,7 +63,7 @@ namespace rtsp_stream { if (begin == std::end(msg_buf)) { BOOST_LOG(error) << "RTSP: read(): Exceeded maximum rtsp packet size: "sv << msg_buf.size(); - respond(sock, nullptr, 400, "BAD REQUEST", 0, {}); + respond(sock, *session, nullptr, 400, "BAD REQUEST", 0, {}); sock.close(); @@ -83,7 +83,7 @@ namespace rtsp_stream { if (begin == std::end(msg_buf)) { BOOST_LOG(error) << "RTSP: read_payload(): Exceeded maximum rtsp packet size: "sv << msg_buf.size(); - respond(sock, nullptr, 400, "BAD REQUEST", 0, {}); + respond(sock, *session, nullptr, 400, "BAD REQUEST", 0, {}); sock.close(); @@ -122,7 +122,7 @@ namespace rtsp_stream { if (auto status = parseRtspMessage(req.get(), socket->msg_buf.data(), (std::size_t)(end - socket->msg_buf.data()))) { BOOST_LOG(error) << "Malformed RTSP message: ["sv << status << ']'; - respond(socket->sock, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(socket->sock, *socket->session, nullptr, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } @@ -206,10 +206,10 @@ namespace rtsp_stream { void handle_data(msg_t &&req) { - handle_data_fn(sock, std::move(req)); + handle_data_fn(sock, *session, std::move(req)); } - std::function handle_data_fn; + std::function handle_data_fn; tcp::socket sock; @@ -217,6 +217,8 @@ namespace rtsp_stream { char *crlf; char *begin = msg_buf.data(); + + std::shared_ptr session; }; class rtsp_server_t { @@ -251,8 +253,8 @@ namespace rtsp_stream { return -1; } - next_socket = std::make_shared(ios, [this](tcp::socket &sock, msg_t &&msg) { - handle_msg(sock, std::move(msg)); + next_socket = std::make_shared(ios, [this](tcp::socket &sock, launch_session_t &session, msg_t &&msg) { + handle_msg(sock, session, std::move(msg)); }); acceptor.async_accept(next_socket->sock, [this](const auto &ec) { @@ -269,13 +271,13 @@ namespace rtsp_stream { } void - handle_msg(tcp::socket &sock, msg_t &&req) { + handle_msg(tcp::socket &sock, launch_session_t &session, msg_t &&req) { auto func = _map_cmd_cb.find(req->message.request.command); if (func != std::end(_map_cmd_cb)) { - func->second(this, sock, std::move(req)); + func->second(this, sock, session, std::move(req)); } else { - cmd_not_found(sock, std::move(req)); + cmd_not_found(sock, session, std::move(req)); } sock.shutdown(boost::asio::socket_base::shutdown_type::shutdown_both); @@ -291,12 +293,17 @@ namespace rtsp_stream { return; } - auto socket = std::move(next_socket); - socket->read(); + auto launch_session { launch_event.view() }; + if (launch_session) { + // Associate the current RTSP session with this socket and start reading + auto socket = std::move(next_socket); + socket->session = launch_session; + socket->read(); - next_socket = std::make_shared(ios, [this](tcp::socket &sock, msg_t &&msg) { - handle_msg(sock, std::move(msg)); - }); + next_socket = std::make_shared(ios, [this](tcp::socket &sock, launch_session_t &session, msg_t &&msg) { + handle_msg(sock, session, std::move(msg)); + }); + } acceptor.async_accept(next_socket->sock, [this](const auto &ec) { handle_accept(ec); @@ -313,16 +320,9 @@ namespace rtsp_stream { * @note If the client does not begin streaming within the ping_timeout, * the session will be discarded. * @param launch_session Streaming session information. - * - * EXAMPLES: - * ```cpp - * launch_session_t launch_session; - * rtsp_server_t server {}; - * server.session_raise(launch_session); - * ``` */ void - session_raise(rtsp_stream::launch_session_t launch_session) { + session_raise(std::shared_ptr launch_session) { auto now = std::chrono::steady_clock::now(); // If a launch event is still pending, don't overwrite it. @@ -332,7 +332,26 @@ namespace rtsp_stream { raised_timeout = now + config::stream.ping_timeout; --_slot_count; - launch_event.raise(launch_session); + launch_event.raise(std::move(launch_session)); + } + + /** + * @brief Clear state for the oldest launch session. + * @param launch_session_id The ID of the session to clear. + */ + void + session_clear(uint32_t launch_session_id) { + // We currently only support a single pending RTSP session, + // so the ID should always match the one for that session. + auto launch_session = launch_event.view(); + if (launch_session) { + if (launch_session->id != launch_session_id) { + BOOST_LOG(error) << "Attempted to clear unexpected session: "sv << launch_session_id << " vs "sv << launch_session->id; + } + else { + launch_event.pop(); + } + } } int @@ -340,7 +359,7 @@ namespace rtsp_stream { return config::stream.channels - _slot_count; } - safe::event_t launch_event; + safe::event_t> launch_event; /** * @brief Clear launch sessions. @@ -420,8 +439,17 @@ namespace rtsp_stream { rtsp_server_t server {}; void - launch_session_raise(rtsp_stream::launch_session_t launch_session) { - server.session_raise(launch_session); + launch_session_raise(std::shared_ptr launch_session) { + server.session_raise(std::move(launch_session)); + } + + /** + * @brief Clear state for the specified launch session. + * @param launch_session_id The ID of the session to clear. + */ + void + launch_session_clear(uint32_t launch_session_id) { + server.session_clear(launch_session_id); } int @@ -450,7 +478,7 @@ namespace rtsp_stream { } void - respond(tcp::socket &sock, msg_t &resp) { + respond(tcp::socket &sock, launch_session_t &session, msg_t &resp) { auto payload = std::make_pair(resp->payload, resp->payloadLength); // Restore response message for proper destruction @@ -480,20 +508,20 @@ namespace rtsp_stream { } void - respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { + respond(tcp::socket &sock, launch_session_t &session, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { msg_t resp { new msg_t::element_type }; createRtspResponse(resp.get(), nullptr, 0, const_cast("RTSP/1.0"), statuscode, const_cast(status_msg), seqn, options, const_cast(payload.data()), (int) payload.size()); - respond(sock, resp); + respond(sock, session, resp); } void - cmd_not_found(tcp::socket &sock, msg_t &&req) { - respond(sock, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); + cmd_not_found(tcp::socket &sock, launch_session_t &session, msg_t &&req) { + respond(sock, session, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); } void - cmd_option(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { + cmd_option(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified @@ -502,11 +530,11 @@ namespace rtsp_stream { auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - respond(sock, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, session, &option, 200, "OK", req->sequenceNumber, {}); } void - cmd_describe(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { + cmd_describe(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified @@ -587,11 +615,11 @@ namespace rtsp_stream { ss << std::endl; } - respond(sock, &option, 200, "OK", req->sequenceNumber, ss.str()); + respond(sock, session, &option, 200, "OK", req->sequenceNumber, ss.str()); } void - cmd_setup(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { + cmd_setup(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) { OPTION_ITEM options[4] {}; auto &seqn = options[0]; @@ -604,14 +632,6 @@ namespace rtsp_stream { auto seqn_str = std::to_string(req->sequenceNumber); seqn.content = const_cast(seqn_str.c_str()); - if (!server->launch_event.peek()) { - // /launch has not been used - - respond(sock, &seqn, 503, "Service Unavailable", req->sequenceNumber, {}); - return; - } - auto launch_session { server->launch_event.view() }; - std::string_view target { req->message.request.target }; auto begin = std::find(std::begin(target), std::end(target), '=') + 1; auto end = std::find(begin, std::end(target), '/'); @@ -628,7 +648,7 @@ namespace rtsp_stream { port = map_port(stream::CONTROL_PORT); } else { - cmd_not_found(sock, std::move(req)); + cmd_not_found(sock, session, std::move(req)); return; } @@ -647,23 +667,23 @@ namespace rtsp_stream { port_option.content = port_value.data(); // Send identifiers that will be echoed in the other connections - auto connect_data = std::to_string(launch_session->control_connect_data); + auto connect_data = std::to_string(session.control_connect_data); if (type == "control"sv) { payload_option.option = const_cast("X-SS-Connect-Data"); payload_option.content = connect_data.data(); } else { payload_option.option = const_cast("X-SS-Ping-Payload"); - payload_option.content = launch_session->av_ping_payload.data(); + payload_option.content = session.av_ping_payload.data(); } port_option.next = &payload_option; - respond(sock, &seqn, 200, "OK", req->sequenceNumber, {}); + respond(sock, session, &seqn, 200, "OK", req->sequenceNumber, {}); } void - cmd_announce(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { + cmd_announce(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified @@ -672,14 +692,6 @@ namespace rtsp_stream { auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - if (!server->launch_event.peek()) { - // /launch has not been used - - respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {}); - return; - } - auto launch_session { server->launch_event.pop() }; - std::string_view payload { req->payload, (size_t) req->payloadLength }; std::vector lines; @@ -739,7 +751,7 @@ namespace rtsp_stream { stream::config_t config; std::int64_t configuredBitrateKbps; - config.audio.flags[audio::config_t::HOST_AUDIO] = launch_session->host_audio; + config.audio.flags[audio::config_t::HOST_AUDIO] = session.host_audio; try { config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv)); config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv)); @@ -774,7 +786,7 @@ namespace rtsp_stream { configuredBitrateKbps = util::from_view(args.at("x-ml-video.configuredBitrateKbps"sv)); } catch (std::out_of_range &) { - respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } @@ -820,14 +832,14 @@ namespace rtsp_stream { if (config.monitor.videoFormat == 1 && video::active_hevc_mode == 1) { BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv; - respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } if (config.monitor.videoFormat == 2 && video::active_av1_mode == 1) { BOOST_LOG(warning) << "AV1 is disabled, yet the client requested AV1"sv; - respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(sock, session, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } @@ -844,33 +856,33 @@ namespace rtsp_stream { (config.encryptionFlagsEnabled & (SS_ENC_VIDEO | SS_ENC_AUDIO)) != (SS_ENC_VIDEO | SS_ENC_AUDIO)) { BOOST_LOG(error) << "Rejecting client that cannot comply with mandatory encryption requirement"sv; - respond(sock, &option, 403, "Forbidden", req->sequenceNumber, {}); + respond(sock, session, &option, 403, "Forbidden", req->sequenceNumber, {}); return; } - auto session = stream::session::alloc(config, *launch_session); + auto stream_session = stream::session::alloc(config, session); - auto slot = server->accept(session); + auto slot = server->accept(stream_session); if (!slot) { BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']'; - respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {}); + respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {}); return; } - if (stream::session::start(*session, sock.remote_endpoint().address().to_string())) { + if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) { BOOST_LOG(error) << "Failed to start a streaming session"sv; server->clear(slot); - respond(sock, &option, 500, "Internal Server Error", req->sequenceNumber, {}); + respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {}); return; } - respond(sock, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, session, &option, 200, "OK", req->sequenceNumber, {}); } void - cmd_play(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { + cmd_play(rtsp_server_t *server, tcp::socket &sock, launch_session_t &session, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified @@ -879,7 +891,7 @@ namespace rtsp_stream { auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - respond(sock, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, session, &option, 200, "OK", req->sequenceNumber, {}); } void @@ -891,7 +903,6 @@ namespace rtsp_stream { server.map("DESCRIBE"sv, &cmd_describe); server.map("SETUP"sv, &cmd_setup); server.map("ANNOUNCE"sv, &cmd_announce); - server.map("PLAY"sv, &cmd_play); boost::system::error_code ec; diff --git a/src/rtsp.h b/src/rtsp.h index 3cdd5c7c..01674059 100644 --- a/src/rtsp.h +++ b/src/rtsp.h @@ -13,6 +13,8 @@ namespace rtsp_stream { constexpr auto RTSP_SETUP_PORT = 21; struct launch_session_t { + uint32_t id; + crypto::aes_t gcm_key; crypto::aes_t iv; @@ -32,7 +34,11 @@ namespace rtsp_stream { }; void - launch_session_raise(launch_session_t launch_session); + launch_session_raise(std::shared_ptr launch_session); + + void + launch_session_clear(uint32_t launch_session_id); + int session_count(); diff --git a/src/stream.cpp b/src/stream.cpp index 41f80e2e..f99e68c1 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -411,6 +411,8 @@ namespace stream { safe::mail_raw_t::event_t hdr_queue; } control; + std::uint32_t launch_session_id; + safe::mail_raw_t::event_t shutdown_event; safe::signal_t controlEnd; @@ -523,6 +525,9 @@ namespace stream { } } + // Once the control stream connection is established, RTSP session state can be torn down + rtsp_stream::launch_session_clear(session_p->launch_session_id); + session_p->control.peer = peer; // Use the local address from the control connection as the source address @@ -1881,6 +1886,7 @@ namespace stream { auto mail = std::make_shared(); session->shutdown_event = mail->event(mail::shutdown); + session->launch_session_id = launch_session.id; session->config = config;