Proper Interrupt Handling

This commit is contained in:
loki 2020-01-20 00:22:13 +01:00
parent ad5cddd426
commit a23494b10c
7 changed files with 121 additions and 24 deletions

View File

@ -7,6 +7,7 @@
#include <thread> #include <thread>
#include <filesystem> #include <filesystem>
#include <iostream> #include <iostream>
#include <csignal>
#include <boost/log/common.hpp> #include <boost/log/common.hpp>
#include <boost/log/sinks.hpp> #include <boost/log/sinks.hpp>
@ -49,6 +50,18 @@ void log_flush() {
sink->flush(); sink->flush();
} }
std::map<int, std::function<void()>> signal_handlers;
void on_signal_forwarder(int sig) {
signal_handlers.at(sig)();
}
template<class FN>
void on_signal(int sig, FN &&fn) {
signal_handlers.emplace(sig, std::forward<FN>(fn));
std::signal(sig, on_signal_forwarder);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
if(argc > 1) { if(argc > 1) {
if(!std::filesystem::exists(argv[1])) { if(!std::filesystem::exists(argv[1])) {
@ -96,6 +109,13 @@ int main(int argc, char *argv[]) {
bl::core::get()->add_sink(sink); bl::core::get()->add_sink(sink);
auto fg = util::fail_guard(log_flush); auto fg = util::fail_guard(log_flush);
// Create signal handler after logging has been initialized
auto shutdown_event = std::make_shared<safe::event_t<bool>>();
on_signal(SIGINT, [shutdown_event]() {
BOOST_LOG(info) << "Interrupt handler called"sv;
shutdown_event->raise(true);
});
auto proc_opt = proc::parse(config::stream.file_apps); auto proc_opt = proc::parse(config::stream.file_apps);
if(!proc_opt) { if(!proc_opt) {
return 7; return 7;
@ -107,9 +127,9 @@ int main(int argc, char *argv[]) {
task_pool.start(1); task_pool.start(1);
std::thread httpThread { nvhttp::start }; std::thread httpThread { nvhttp::start, shutdown_event };
stream::rtpThread(shutdown_event);
stream::rtpThread();
httpThread.join(); httpThread.join();
return 0; return 0;

View File

@ -519,7 +519,7 @@ void launch(resp_https_t response, req_https_t request) {
stream::launch_session_t launch_session; stream::launch_session_t launch_session;
if(stream::has_session) { if(stream::session_state != stream::state_e::STOPPED) {
tree.put("root.<xmlattr>.status_code", 503); tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.gamesession", 0); tree.put("root.gamesession", 0);
@ -576,7 +576,7 @@ void resume(resp_https_t response, req_https_t request) {
}); });
auto current_appid = proc::proc.running(); auto current_appid = proc::proc.running();
if(current_appid == -1 || stream::has_session) { if(current_appid == -1 || stream::session_state != stream::state_e::STOPPED) {
tree.put("root.resume", 0); tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503); tree.put("root.<xmlattr>.status_code", 503);
@ -620,7 +620,7 @@ void cancel(resp_https_t response, req_https_t request) {
return; return;
} }
if(stream::has_session) { if(stream::session_state != stream::state_e::STOPPED) {
tree.put("root.<xmlattr>.status_code", 503); tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.cancel", 0); tree.put("root.cancel", 0);
@ -640,7 +640,7 @@ void appasset(resp_https_t response, req_https_t request) {
response->write(SimpleWeb::StatusCode::success_ok, in); response->write(SimpleWeb::StatusCode::success_ok, in);
} }
void start() { void start(std::shared_ptr<safe::event_t<bool>> shutdown_event) {
local_ip = platf::get_local_ip(); local_ip = platf::get_local_ip();
origin_pin_allowed = net::from_enum_string(config::nvhttp.origin_pin_allowed); origin_pin_allowed = net::from_enum_string(config::nvhttp.origin_pin_allowed);
@ -734,6 +734,12 @@ void start() {
std::thread ssl { &https_server_t::start, &https_server }; std::thread ssl { &https_server_t::start, &https_server };
std::thread tcp { &http_server_t::start, &http_server }; std::thread tcp { &http_server_t::start, &http_server };
// Wait for any event
shutdown_event->view();
https_server.stop();
http_server.stop();
ssl.join(); ssl.join();
tcp.join(); tcp.join();
} }

View File

@ -8,12 +8,14 @@
#include <functional> #include <functional>
#include <string> #include <string>
#include "thread_safe.h"
#define CA_DIR SUNSHINE_ASSETS_DIR "/demoCA" #define CA_DIR SUNSHINE_ASSETS_DIR "/demoCA"
#define PRIVATE_KEY_FILE CA_DIR "/cakey.pem" #define PRIVATE_KEY_FILE CA_DIR "/cakey.pem"
#define CERTIFICATE_FILE CA_DIR "/cacert.pem" #define CERTIFICATE_FILE CA_DIR "/cacert.pem"
namespace nvhttp { namespace nvhttp {
void start(); void start(std::shared_ptr<safe::event_t<bool>> shutdown_event);
} }
#endif //SUNSHINE_NVHTTP_H #endif //SUNSHINE_NVHTTP_H

View File

@ -70,6 +70,7 @@ public:
virtual ~mic_t() = default; virtual ~mic_t() = default;
}; };
void freeInput(void*); void freeInput(void*);
using input_t = util::safe_ptr<void, freeInput>; using input_t = util::safe_ptr<void, freeInput>;

View File

@ -94,7 +94,6 @@ struct audio_packet_raw_t {
safe::event_t<launch_session_t> launch_event; safe::event_t<launch_session_t> launch_event;
//FIXME: This smells bad
std::shared_ptr<input::input_t> input; std::shared_ptr<input::input_t> input;
struct config_t { struct config_t {
@ -123,7 +122,7 @@ struct session_t {
bool has_process; bool has_process;
} session; } session;
std::atomic_bool has_session; std::atomic<state_e> session_state;
void free_host(ENetHost *host) { void free_host(ENetHost *host) {
std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) { std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) {
@ -165,8 +164,8 @@ void stop(session_t &session) {
session.video_packets->stop(); session.video_packets->stop();
session.audio_packets->stop(); session.audio_packets->stop();
input::reset(input); auto expected = state_e::RUNNING;
has_session.store(false); session_state.compare_exchange_strong(expected, state_e::STOPPING);
} }
class rtsp_server_t { class rtsp_server_t {
@ -549,7 +548,11 @@ void controlThread(video::idr_event_t idr_events) {
input::passthrough(input, std::move(plaintext)); input::passthrough(input, std::move(plaintext));
}); });
while(has_session) { while(session_state == state_e::STARTING) {
std::this_thread::sleep_for(1ms);
}
while(session_state == state_e::RUNNING) {
if(std::chrono::steady_clock::now() > session.pingTimeout) { if(std::chrono::steady_clock::now() > session.pingTimeout) {
BOOST_LOG(debug) << "Ping timeout"sv; BOOST_LOG(debug) << "Ping timeout"sv;
@ -641,6 +644,10 @@ std::optional<udp::endpoint> recv_peer(std::shared_ptr<safe::queue_t<T>> &queue,
} }
void audioThread() { void audioThread() {
while(session_state == state_e::STARTING) {
std::this_thread::sleep_for(1ms);
}
auto &config = session.config; auto &config = session.config;
asio::io_service io; asio::io_service io;
@ -676,6 +683,10 @@ void audioThread() {
} }
void videoThread(video::idr_event_t idr_events) { void videoThread(video::idr_event_t idr_events) {
while(session_state == state_e::STARTING) {
std::this_thread::sleep_for(1ms);
}
auto &config = session.config; auto &config = session.config;
int lowseq = 0; int lowseq = 0;
@ -908,9 +919,16 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
auto seqn_str = std::to_string(req->sequenceNumber); auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str()); option.content = const_cast<char*>(seqn_str.c_str());
if(has_session || !launch_event.peek()) { auto expected_state = state_e::STOPPED;
auto abort = !session_state.compare_exchange_strong(expected_state, state_e::STARTING);
if(abort || !launch_event.peek()) {
//Either already streaming or /launch has not been used //Either already streaming or /launch has not been used
if(!abort) {
session_state.store(state_e::STOPPED);
}
respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return; return;
} }
@ -980,8 +998,6 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
return; return;
} }
has_session.store(true);
auto &gcm_key = launch_session->gcm_key; auto &gcm_key = launch_session->gcm_key;
auto &iv = launch_session->iv; auto &iv = launch_session->iv;
@ -1001,6 +1017,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
session.videoThread = std::thread {videoThread, idr_events}; session.videoThread = std::thread {videoThread, idr_events};
session.controlThread = std::thread {controlThread, idr_events}; session.controlThread = std::thread {controlThread, idr_events};
session_state.store(state_e::RUNNING);
respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); respond(host, peer, &option, 200, "OK", req->sequenceNumber, {});
} }
@ -1016,8 +1033,12 @@ void cmd_play(host_t &host, peer_t peer, msg_t &&req) {
respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); respond(host, peer, &option, 200, "OK", req->sequenceNumber, {});
} }
void rtpThread() { void rtpThread(std::shared_ptr<safe::event_t<bool>> shutdown_event) {
input = std::make_shared<input::input_t>(); input = std::make_shared<input::input_t>();
auto fg = util::fail_guard([&]() {
input.reset();
});
rtsp_server_t server(RTSP_SETUP_PORT); rtsp_server_t server(RTSP_SETUP_PORT);
server.map("OPTIONS"sv, &cmd_option); server.map("OPTIONS"sv, &cmd_option);
@ -1027,13 +1048,10 @@ void rtpThread() {
server.map("PLAY"sv, &cmd_play); server.map("PLAY"sv, &cmd_play);
while(true) { while(!shutdown_event->peek()) {
server.iterate(config::stream.ping_timeout); server.iterate(std::min(500ms, config::stream.ping_timeout));
if(session.video_packets && !session.video_packets->running()) {
// Ensure all threads are stopping
stop(session);
if(session_state == state_e::STOPPING) {
BOOST_LOG(debug) << "Waiting for Audio to end..."sv; BOOST_LOG(debug) << "Waiting for Audio to end..."sv;
session.audioThread.join(); session.audioThread.join();
BOOST_LOG(debug) << "Waiting for Video to end..."sv; BOOST_LOG(debug) << "Waiting for Video to end..."sv;
@ -1044,8 +1062,30 @@ void rtpThread() {
BOOST_LOG(debug) << "Resetting Session..."sv << std::endl; BOOST_LOG(debug) << "Resetting Session..."sv << std::endl;
session.video_packets = video::packet_queue_t(); session.video_packets = video::packet_queue_t();
session.audio_packets = audio::packet_queue_t(); session.audio_packets = audio::packet_queue_t();
input::reset(input);
session_state.store(state_e::STOPPED);
} }
} }
if(session_state != state_e::STOPPED) {
// Wait until the session is properly running
while(session_state == state_e::STARTING) {
std::this_thread::sleep_for(1ms);
}
stop(session);
BOOST_LOG(debug) << "Waiting for Audio to end..."sv;
session.audioThread.join();
BOOST_LOG(debug) << "Waiting for Video to end..."sv;
session.videoThread.join();
BOOST_LOG(debug) << "Waiting for Control to end..."sv;
session.controlThread.join();
input::reset(input);
}
} }
} }

View File

@ -9,8 +9,16 @@
#include "crypto.h" #include "crypto.h"
#include "thread_safe.h" #include "thread_safe.h"
namespace stream { namespace stream {
enum class state_e : int {
STOPPED,
STOPPING,
STARTING,
RUNNING,
};
struct launch_session_t { struct launch_session_t {
crypto::aes_t gcm_key; crypto::aes_t gcm_key;
crypto::aes_t iv; crypto::aes_t iv;
@ -19,9 +27,9 @@ struct launch_session_t {
}; };
extern safe::event_t<launch_session_t> launch_event; extern safe::event_t<launch_session_t> launch_event;
extern std::atomic_bool has_session; extern std::atomic<state_e> session_state;
void rtpThread(); void rtpThread(std::shared_ptr<safe::event_t<bool>> shutdown_event);
} }

View File

@ -29,6 +29,7 @@ public:
_cv.notify_all(); _cv.notify_all();
} }
// pop and view shoud not be used interchangebly
status_t pop() { status_t pop() {
std::unique_lock ul{_lock}; std::unique_lock ul{_lock};
@ -49,6 +50,25 @@ public:
return val; return val;
} }
// pop and view shoud not be used interchangebly
const status_t &view() {
std::unique_lock ul{_lock};
if (!_continue) {
return util::false_v<status_t>;
}
while (!_status) {
_cv.wait(ul);
if (!_continue) {
return util::false_v<status_t>;
}
}
return _status;
}
bool peek() { bool peek() {
std::lock_guard lg { _lock }; std::lock_guard lg { _lock };