diff --git a/sunshine/main.cpp b/sunshine/main.cpp index 3398f694..eb845862 100644 --- a/sunshine/main.cpp +++ b/sunshine/main.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,18 @@ void log_flush() { sink->flush(); } +std::map> signal_handlers; +void on_signal_forwarder(int sig) { + signal_handlers.at(sig)(); +} + +template +void on_signal(int sig, FN &&fn) { + signal_handlers.emplace(sig, std::forward(fn)); + + std::signal(sig, on_signal_forwarder); +} + int main(int argc, char *argv[]) { if(argc > 1) { if(!std::filesystem::exists(argv[1])) { @@ -96,6 +109,13 @@ int main(int argc, char *argv[]) { bl::core::get()->add_sink(sink); auto fg = util::fail_guard(log_flush); + // Create signal handler after logging has been initialized + auto shutdown_event = std::make_shared>(); + on_signal(SIGINT, [shutdown_event]() { + BOOST_LOG(info) << "Interrupt handler called"sv; + shutdown_event->raise(true); + }); + auto proc_opt = proc::parse(config::stream.file_apps); if(!proc_opt) { return 7; @@ -107,9 +127,9 @@ int main(int argc, char *argv[]) { task_pool.start(1); - std::thread httpThread { nvhttp::start }; + std::thread httpThread { nvhttp::start, shutdown_event }; + stream::rtpThread(shutdown_event); - stream::rtpThread(); httpThread.join(); return 0; diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index dff5ba49..68e410b6 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -519,7 +519,7 @@ void launch(resp_https_t response, req_https_t request) { stream::launch_session_t launch_session; - if(stream::has_session) { + if(stream::session_state != stream::state_e::STOPPED) { tree.put("root..status_code", 503); tree.put("root.gamesession", 0); @@ -576,7 +576,7 @@ void resume(resp_https_t response, req_https_t request) { }); auto current_appid = proc::proc.running(); - if(current_appid == -1 || stream::has_session) { + if(current_appid == -1 || stream::session_state != stream::state_e::STOPPED) { tree.put("root.resume", 0); tree.put("root..status_code", 503); @@ -620,7 +620,7 @@ void cancel(resp_https_t response, req_https_t request) { return; } - if(stream::has_session) { + if(stream::session_state != stream::state_e::STOPPED) { tree.put("root..status_code", 503); tree.put("root.cancel", 0); @@ -640,7 +640,7 @@ void appasset(resp_https_t response, req_https_t request) { response->write(SimpleWeb::StatusCode::success_ok, in); } -void start() { +void start(std::shared_ptr> shutdown_event) { local_ip = platf::get_local_ip(); origin_pin_allowed = net::from_enum_string(config::nvhttp.origin_pin_allowed); @@ -734,6 +734,12 @@ void start() { std::thread ssl { &https_server_t::start, &https_server }; std::thread tcp { &http_server_t::start, &http_server }; + // Wait for any event + shutdown_event->view(); + + https_server.stop(); + http_server.stop(); + ssl.join(); tcp.join(); } diff --git a/sunshine/nvhttp.h b/sunshine/nvhttp.h index 50bd0a0b..2e19f70d 100644 --- a/sunshine/nvhttp.h +++ b/sunshine/nvhttp.h @@ -8,12 +8,14 @@ #include #include +#include "thread_safe.h" + #define CA_DIR SUNSHINE_ASSETS_DIR "/demoCA" #define PRIVATE_KEY_FILE CA_DIR "/cakey.pem" #define CERTIFICATE_FILE CA_DIR "/cacert.pem" namespace nvhttp { -void start(); +void start(std::shared_ptr> shutdown_event); } #endif //SUNSHINE_NVHTTP_H diff --git a/sunshine/platform/common.h b/sunshine/platform/common.h index 1f48254c..c7216fc4 100644 --- a/sunshine/platform/common.h +++ b/sunshine/platform/common.h @@ -70,6 +70,7 @@ public: virtual ~mic_t() = default; }; + void freeInput(void*); using input_t = util::safe_ptr; diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 6fa65aaf..95de649d 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -94,7 +94,6 @@ struct audio_packet_raw_t { safe::event_t launch_event; -//FIXME: This smells bad std::shared_ptr input; struct config_t { @@ -123,7 +122,7 @@ struct session_t { bool has_process; } session; -std::atomic_bool has_session; +std::atomic session_state; void free_host(ENetHost *host) { std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) { @@ -165,8 +164,8 @@ void stop(session_t &session) { session.video_packets->stop(); session.audio_packets->stop(); - input::reset(input); - has_session.store(false); + auto expected = state_e::RUNNING; + session_state.compare_exchange_strong(expected, state_e::STOPPING); } class rtsp_server_t { @@ -549,7 +548,11 @@ void controlThread(video::idr_event_t idr_events) { input::passthrough(input, std::move(plaintext)); }); - while(has_session) { + while(session_state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + + while(session_state == state_e::RUNNING) { if(std::chrono::steady_clock::now() > session.pingTimeout) { BOOST_LOG(debug) << "Ping timeout"sv; @@ -641,6 +644,10 @@ std::optional recv_peer(std::shared_ptr> &queue, } void audioThread() { + while(session_state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + auto &config = session.config; asio::io_service io; @@ -676,6 +683,10 @@ void audioThread() { } void videoThread(video::idr_event_t idr_events) { + while(session_state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + auto &config = session.config; int lowseq = 0; @@ -908,9 +919,16 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - if(has_session || !launch_event.peek()) { + auto expected_state = state_e::STOPPED; + auto abort = !session_state.compare_exchange_strong(expected_state, state_e::STARTING); + + if(abort || !launch_event.peek()) { //Either already streaming or /launch has not been used + if(!abort) { + session_state.store(state_e::STOPPED); + } + respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); return; } @@ -980,8 +998,6 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { return; } - has_session.store(true); - auto &gcm_key = launch_session->gcm_key; auto &iv = launch_session->iv; @@ -1001,6 +1017,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { session.videoThread = std::thread {videoThread, idr_events}; session.controlThread = std::thread {controlThread, idr_events}; + session_state.store(state_e::RUNNING); respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); } @@ -1016,8 +1033,12 @@ void cmd_play(host_t &host, peer_t peer, msg_t &&req) { respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); } -void rtpThread() { +void rtpThread(std::shared_ptr> shutdown_event) { input = std::make_shared(); + auto fg = util::fail_guard([&]() { + input.reset(); + }); + rtsp_server_t server(RTSP_SETUP_PORT); server.map("OPTIONS"sv, &cmd_option); @@ -1027,13 +1048,10 @@ void rtpThread() { server.map("PLAY"sv, &cmd_play); - while(true) { - server.iterate(config::stream.ping_timeout); - - if(session.video_packets && !session.video_packets->running()) { - // Ensure all threads are stopping - stop(session); + while(!shutdown_event->peek()) { + server.iterate(std::min(500ms, config::stream.ping_timeout)); + if(session_state == state_e::STOPPING) { BOOST_LOG(debug) << "Waiting for Audio to end..."sv; session.audioThread.join(); BOOST_LOG(debug) << "Waiting for Video to end..."sv; @@ -1044,8 +1062,30 @@ void rtpThread() { BOOST_LOG(debug) << "Resetting Session..."sv << std::endl; session.video_packets = video::packet_queue_t(); session.audio_packets = audio::packet_queue_t(); + + input::reset(input); + + session_state.store(state_e::STOPPED); } } + + if(session_state != state_e::STOPPED) { + // Wait until the session is properly running + while(session_state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + + stop(session); + + BOOST_LOG(debug) << "Waiting for Audio to end..."sv; + session.audioThread.join(); + BOOST_LOG(debug) << "Waiting for Video to end..."sv; + session.videoThread.join(); + BOOST_LOG(debug) << "Waiting for Control to end..."sv; + session.controlThread.join(); + + input::reset(input); + } } } diff --git a/sunshine/stream.h b/sunshine/stream.h index 891b692c..74da40a2 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -9,8 +9,16 @@ #include "crypto.h" #include "thread_safe.h" + namespace stream { +enum class state_e : int { + STOPPED, + STOPPING, + STARTING, + RUNNING, +}; + struct launch_session_t { crypto::aes_t gcm_key; crypto::aes_t iv; @@ -19,9 +27,9 @@ struct launch_session_t { }; extern safe::event_t launch_event; -extern std::atomic_bool has_session; +extern std::atomic session_state; -void rtpThread(); +void rtpThread(std::shared_ptr> shutdown_event); } diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index 08181234..24ceee01 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -29,6 +29,7 @@ public: _cv.notify_all(); } + // pop and view shoud not be used interchangebly status_t pop() { std::unique_lock ul{_lock}; @@ -49,6 +50,25 @@ public: return val; } + // pop and view shoud not be used interchangebly + const status_t &view() { + std::unique_lock ul{_lock}; + + if (!_continue) { + return util::false_v; + } + + while (!_status) { + _cv.wait(ul); + + if (!_continue) { + return util::false_v; + } + } + + return _status; + } + bool peek() { std::lock_guard lg { _lock };