From 4b216d66765920f5fcbcfcc34192077dd5263b72 Mon Sep 17 00:00:00 2001 From: loki Date: Sat, 8 Feb 2020 23:41:27 +0100 Subject: [PATCH] Fix bad function call --- sunshine/nvhttp.cpp | 2 +- sunshine/process.cpp | 2 +- sunshine/rtsp.cpp | 34 ++------ sunshine/stream.cpp | 180 +++++++++++++++++++++++++++++++++-------- sunshine/stream.h | 71 +--------------- sunshine/thread_safe.h | 19 +++-- 6 files changed, 168 insertions(+), 140 deletions(-) diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index b7fffc1f..4e03ad44 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -528,7 +528,7 @@ void launch(resp_https_t response, req_https_t request) { }); auto args = request->parse_query_string(); - auto appid = util::from_view(args.at("appid")) -2; + auto appid = util::from_view(args.at("appid")) -1; auto current_appid = proc::proc.running(); if(current_appid != -1) { diff --git a/sunshine/process.cpp b/sunshine/process.cpp index fe39a765..5eef5090 100644 --- a/sunshine/process.cpp +++ b/sunshine/process.cpp @@ -48,7 +48,7 @@ int proc_t::execute(int app_id) { _app_id = -1; } - if(app_id >= _apps.size()) { + if(app_id < 0 || app_id >= _apps.size()) { BOOST_LOG(error) << "Couldn't find app with ID ["sv << app_id << ']'; return 404; diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp index 2b09d1e6..fa10729d 100644 --- a/sunshine/rtsp.cpp +++ b/sunshine/rtsp.cpp @@ -122,19 +122,8 @@ public: continue; } - // Wait until the session is properly running - while (session->state == state_e::STARTING) { - std::this_thread::sleep_for(1ms); - } - ::stream::stop(*session); - - BOOST_LOG(debug) << "Waiting for Audio to end..."sv; - session->audioThread.join(); - BOOST_LOG(debug) << "Waiting for Video to end..."sv; - session->videoThread.join(); - - input::reset(input); + ::stream::join(*session); } } @@ -347,10 +336,8 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { args.try_emplace("x-nv-video[0].dynamicRangeMode"sv, "0"sv); args.try_emplace("x-nv-aqos.packetDuration"sv, "5"sv); - auto session = std::make_shared(); + config_t config; try { - - auto &config = session->config; config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv)); config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv)); config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv)); @@ -373,13 +360,14 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { return; } - if(session->config.monitor.videoFormat != 0 && config::video.hevc_mode == 0) { + if(config.monitor.videoFormat != 0 && config::video.hevc_mode == 0) { BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv; respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } + auto session = alloc_session(config, launch_session->gcm_key, launch_session->iv); if(!server->accept(session)) { BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']'; @@ -387,20 +375,8 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { return; } - auto &gcm_key = launch_session->gcm_key; - auto &iv = launch_session->iv; + start_session(session, platf::from_sockaddr((sockaddr*)&peer->address.address)); - std::copy(std::begin(gcm_key), std::end(gcm_key), std::begin(session->gcm_key)); - std::copy(std::begin(iv), std::end(iv), std::begin(session->iv)); - - session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - - session->idr_events = std::make_shared(); - - session->audioThread = std::thread {audioThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)}; - session->videoThread = std::thread {videoThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)}; - - session->state.store(state_e::RUNNING); respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); } diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index cdc78e25..d9ec7042 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -43,10 +43,32 @@ static const short packetTypes[] = { 0x0100, // Termination }; +constexpr auto VIDEO_STREAM_PORT = 47998; +constexpr auto CONTROL_PORT = 47999; +constexpr auto AUDIO_STREAM_PORT = 48000; + +namespace asio = boost::asio; +namespace sys = boost::system; + +using asio::ip::tcp; +using asio::ip::udp; + using namespace std::literals; namespace stream { +enum class socket_e : int { + video, + audio +}; + +enum class state_e : int { + STOPPED, + STOPPING, + STARTING, + RUNNING, +}; + #pragma pack(push, 1) struct video_packet_raw_t { @@ -72,19 +94,59 @@ using rh_t = util::safe_ptr; using video_packet_t = util::c_ptr; using audio_packet_t = util::c_ptr; +using message_queue_t = std::shared_ptr>>; +using message_queue_queue_t = std::shared_ptr>>; +using session_queue_t = std::shared_ptr>>>; + +struct broadcast_ctx_t { + safe::event_t shutdown_event; + + video::packet_queue_t video_packets; + audio::packet_queue_t audio_packets; + + message_queue_queue_t message_queue_queue; + session_queue_t session_queue; + + std::thread video_thread; + std::thread audio_thread; + std::thread control_thread; + + std::thread recv_thread; + + asio::io_service io; + udp::socket video_sock { io, udp::endpoint(udp::v4(), VIDEO_STREAM_PORT) }; + udp::socket audio_sock { io, udp::endpoint(udp::v4(), AUDIO_STREAM_PORT) }; +}; + +struct session_t { + config_t config; + + std::thread audioThread; + std::thread videoThread; + + std::chrono::steady_clock::time_point pingTimeout; + + safe::shared_t::ptr_t broadcast_ref; + udp::endpoint video_peer; + udp::endpoint audio_peer; + + video::idr_event_t idr_events; + + crypto::aes_t gcm_key; + crypto::aes_t iv; + + std::atomic state; +}; + +void videoThread(std::shared_ptr session, std::string addr_str); +void audioThread(std::shared_ptr session, std::string addr_str); + int start_broadcast(broadcast_ctx_t &ctx); void end_broadcast(broadcast_ctx_t &ctx); std::shared_ptr input; static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); -void stop(session_t &session) { - session.idr_events->stop(); - - auto expected = state_e::RUNNING; - session.state.compare_exchange_strong(expected, state_e::STOPPING); -} - class control_server_t { public: control_server_t(control_server_t &&) noexcept = default; @@ -379,6 +441,8 @@ void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t payload[1] = reason; server.send(std::string_view {(char*)payload.data(), payload.size()}); + + //TODO: Terminate session } server.iterate(500ms); @@ -388,44 +452,21 @@ void controlBroadcastThread(safe::event_t *shutdown_event, session_queue_t void recvThread(broadcast_ctx_t &ctx) { std::map peer_to_video_session; std::map peer_to_audio_session; - std::map peer_to_control_session; auto &video_sock = ctx.video_sock; auto &audio_sock = ctx.audio_sock; - auto &session_queue = ctx.message_queue_queue; + auto &message_queue_queue = ctx.message_queue_queue; auto &io = ctx.io; udp::endpoint peer; std::array buf[2]; + std::function recv_func[2]; - auto recv_func_factory = [&](udp::socket &sock, int buf_elem, std::map &peer_to_session) { - std::function recv_func = [&](const boost::system::error_code &ec, size_t bytes) { - if(ec || !bytes) { - BOOST_LOG(fatal) << "Couldn't receive data from udp socket: "sv << ec.message(); - - log_flush(); - std::abort(); - } - - auto it = peer_to_session.find(peer.address()); - if(it != std::end(peer_to_session)) { - it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes }); - } - - sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func); - }; - - return recv_func; - }; - - video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func_factory(video_sock, 0, peer_to_video_session)); - audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func_factory(audio_sock, 1, peer_to_audio_session)); - - while(!ctx.shutdown_event.peek()) { - while(session_queue->peek()) { - auto message_queue_opt = session_queue->pop(); + auto populate_peer_to_session = [&]() { + while(message_queue_queue->peek()) { + auto message_queue_opt = message_queue_queue->pop(); TUPLE_3D_REF(socket_type, addr, message_queue, *message_queue_opt); switch(socket_type) { @@ -447,6 +488,36 @@ void recvThread(broadcast_ctx_t &ctx) { break; } } + }; + + auto recv_func_init = [&](udp::socket &sock, int buf_elem, std::map &peer_to_session) { + recv_func[buf_elem] = [&,buf_elem](const boost::system::error_code &ec, size_t bytes) { + populate_peer_to_session(); + + if(ec || !bytes) { + BOOST_LOG(fatal) << "Couldn't receive data from udp socket: "sv << ec.message(); + + log_flush(); + std::abort(); + } + + auto it = peer_to_session.find(peer.address()); + if(it != std::end(peer_to_session)) { + it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes }); + } + + sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func[buf_elem]); + }; + }; + + recv_func_init(video_sock, 0, peer_to_video_session); + recv_func_init(audio_sock, 1, peer_to_audio_session); + + video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]); + audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]); + + while(!ctx.shutdown_event.peek()) { + io.run_one(); } @@ -696,4 +767,43 @@ void audioThread(std::shared_ptr session, std::string addr_str) { audio::capture(ref->audio_packets, session->config.audio, session.get()); } + +void stop(session_t &session) { + session.idr_events->stop(); + + auto expected = state_e::RUNNING; + session.state.compare_exchange_strong(expected, state_e::STOPPING); +} + +void join(session_t &session) { + BOOST_LOG(debug) << "Waiting for video to end..."sv; + session.videoThread.join(); + BOOST_LOG(debug) << "Waiting for audio to end..."sv; + session.audioThread.join(); +} + +void start_session(std::shared_ptr session, const std::string &addr_string) { + session->broadcast_ref = broadcast.ref(); + session->broadcast_ref->session_queue->raise(addr_string, session); + + session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; + + session->audioThread = std::thread {audioThread, session, addr_string}; + session->videoThread = std::thread {videoThread, session, addr_string}; + + session->state.store(state_e::RUNNING, std::memory_order_relaxed); +} + +std::shared_ptr alloc_session(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) { + auto session = std::make_shared(); + + session->config = config; + session->gcm_key = gcm_key; + session->iv = iv; + + session->idr_events = std::make_shared(); + session->state.store(state_e::STOPPED, std::memory_order_relaxed); + + return session; +} } diff --git a/sunshine/stream.h b/sunshine/stream.h index 45f3bb63..29fbdd82 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -7,7 +7,6 @@ #include -#include "thread_safe.h" #include "video.h" #include "audio.h" #include "crypto.h" @@ -17,33 +16,7 @@ struct input_t; } namespace stream { -constexpr auto VIDEO_STREAM_PORT = 47998; -constexpr auto CONTROL_PORT = 47999; -constexpr auto AUDIO_STREAM_PORT = 48000; - -namespace asio = boost::asio; -namespace sys = boost::system; - -using asio::ip::tcp; -using asio::ip::udp; - -enum class socket_e : int { - video, - audio -}; - -enum class state_e : int { - STOPPED, - STOPPING, - STARTING, - RUNNING, -}; - struct session_t; -using message_queue_t = std::shared_ptr>>; -using message_queue_queue_t = std::shared_ptr>>; -using session_queue_t = std::shared_ptr>>>; - struct config_t { audio::config_t audio; video::config_t monitor; @@ -53,49 +26,11 @@ struct config_t { std::optional gcmap; }; -struct broadcast_ctx_t { - safe::event_t shutdown_event; - - video::packet_queue_t video_packets; - audio::packet_queue_t audio_packets; - - message_queue_queue_t message_queue_queue; - session_queue_t session_queue; - - std::thread video_thread; - std::thread audio_thread; - std::thread control_thread; - - std::thread recv_thread; - - asio::io_service io; - udp::socket video_sock { io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT) }; - udp::socket audio_sock { io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT) }; -}; - -struct session_t { - config_t config; - - std::thread audioThread; - std::thread videoThread; - - std::chrono::steady_clock::time_point pingTimeout; - - udp::endpoint video_peer; - udp::endpoint audio_peer; - - video::idr_event_t idr_events; - - crypto::aes_t gcm_key; - crypto::aes_t iv; - - std::atomic state; -}; - -void videoThread(std::shared_ptr session, std::string addr_str); -void audioThread(std::shared_ptr session, std::string addr_str); +std::shared_ptr alloc_session(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv); +void start_session(std::shared_ptr session, const std::string &addr_string); void stop(session_t &session); +void join(session_t &session); extern std::shared_ptr input; } diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index 41a16714..21eeba5f 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -217,22 +217,29 @@ public: struct ptr_t { shared_t *owner; + ptr_t() : owner { nullptr } {} explicit ptr_t(shared_t *owner) : owner { owner } {} - ptr_t(ptr_t &&ptr) noexcept { - owner = ptr.owner; - + ptr_t(ptr_t &&ptr) noexcept : owner { ptr.owner } { ptr.owner = nullptr; } - ptr_t(const ptr_t &ptr) noexcept { - auto tmp = ptr.owner->ref(); + ptr_t(const ptr_t &ptr) noexcept : owner { ptr.owner } { + if(!owner) { + return; + } - owner = tmp.owner; + auto tmp = ptr.owner->ref(); tmp.owner = nullptr; } ptr_t &operator=(const ptr_t &ptr) noexcept { + if(!ptr.owner) { + release(); + + return *this; + } + return *this = std::move(*ptr.owner->ref()); }