diff --git a/sunshine/confighttp.cpp b/sunshine/confighttp.cpp index 7dec86e3..ed0a3b88 100644 --- a/sunshine/confighttp.cpp +++ b/sunshine/confighttp.cpp @@ -452,7 +452,9 @@ void savePin(resp_https_t response, req_https_t request) { } } -void start(std::shared_ptr shutdown_event) { +void start() { + auto shutdown_event = mail::man->event(mail::shutdown); + auto ctx = std::make_shared(boost::asio::ssl::context::tls); ctx->use_certificate_chain_file(config::nvhttp.cert); ctx->use_private_key_file(config::nvhttp.pkey, boost::asio::ssl::context::pem); diff --git a/sunshine/confighttp.h b/sunshine/confighttp.h index edd7181e..25507503 100644 --- a/sunshine/confighttp.h +++ b/sunshine/confighttp.h @@ -14,7 +14,7 @@ namespace confighttp { -void start(std::shared_ptr shutdown_event); +void start(); } #endif //SUNSHINE_CONFIGHTTP_H diff --git a/sunshine/main.cpp b/sunshine/main.cpp index 293c5b11..308e781e 100644 --- a/sunshine/main.cpp +++ b/sunshine/main.cpp @@ -19,6 +19,7 @@ #include "config.h" #include "confighttp.h" #include "httpcommon.h" +#include "main.h" #include "nvhttp.h" #include "rtsp.h" #include "thread_pool.h" @@ -30,6 +31,8 @@ extern "C" { #include } +safe::mail_t mail::man; + using namespace std::literals; namespace bl = boost::log; @@ -109,6 +112,8 @@ std::map(); + if(config::parse(argc, argv)) { return 0; } @@ -180,9 +185,10 @@ int main(int argc, char *argv[]) { } // Create signal handler after logging has been initialized - auto shutdown_event = std::make_shared>(); + auto shutdown_event = mail::man->event(mail::shutdown); on_signal(SIGINT, [shutdown_event]() { BOOST_LOG(info) << "Interrupt handler called"sv; + shutdown_event->raise(true); }); @@ -208,9 +214,9 @@ int main(int argc, char *argv[]) { task_pool.start(1); - std::thread httpThread { nvhttp::start, shutdown_event }; - std::thread configThread { confighttp::start, shutdown_event }; - stream::rtpThread(shutdown_event); + std::thread httpThread { nvhttp::start }; + std::thread configThread { confighttp::start }; + stream::rtpThread(); httpThread.join(); configThread.join(); diff --git a/sunshine/main.h b/sunshine/main.h index 8b767ad1..0c1a2869 100644 --- a/sunshine/main.h +++ b/sunshine/main.h @@ -5,7 +5,11 @@ #ifndef SUNSHINE_MAIN_H #define SUNSHINE_MAIN_H +#include + #include "thread_pool.h" +#include "thread_safe.h" + #include extern util::ThreadPool task_pool; @@ -24,4 +28,19 @@ void print_help(const char *name); std::string read_file(const char *path); int write_file(const char *path, const std::string_view &contents); + +namespace mail { +#define MAIL(x) \ + constexpr auto x = std::string_view { #x } + +extern safe::mail_t man; + +MAIL(shutdown); +MAIL(broadcast_shutdown); + + +#undef MAIL +} // namespace mail + + #endif //SUNSHINE_MAIN_H diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index f96c7a80..9b22deb9 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -758,7 +758,8 @@ void appasset(resp_https_t response, req_https_t request) { response->write(SimpleWeb::StatusCode::success_ok, in); } -void start(std::shared_ptr shutdown_event) { +void start() { + auto shutdown_event = mail::man->event(mail::shutdown); bool clean_slate = config::sunshine.flags[config::flag::FRESH_STATE]; diff --git a/sunshine/nvhttp.h b/sunshine/nvhttp.h index 7ee9f2c7..36bdee68 100644 --- a/sunshine/nvhttp.h +++ b/sunshine/nvhttp.h @@ -12,7 +12,7 @@ #include namespace nvhttp { -void start(std::shared_ptr shutdown_event); +void start(); bool pin(std::string pin); } // namespace nvhttp diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp index 2ef9e171..89197de3 100644 --- a/sunshine/rtsp.cpp +++ b/sunshine/rtsp.cpp @@ -494,7 +494,10 @@ void cmd_play(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); } -void rtpThread(std::shared_ptr shutdown_event) { +void rtpThread() { + auto shutdown_event = mail::man->event(mail::shutdown); + auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); + server.map("OPTIONS"sv, &cmd_option); server.map("DESCRIBE"sv, &cmd_describe); server.map("SETUP"sv, &cmd_setup); @@ -512,7 +515,7 @@ void rtpThread(std::shared_ptr shutdown_event) { while(!shutdown_event->peek()) { server.iterate(std::min(500ms, config::stream.ping_timeout)); - if(broadcast_shutdown_event.peek()) { + if(broadcast_shutdown_event->peek()) { server.clear(); } else { diff --git a/sunshine/rtsp.h b/sunshine/rtsp.h index f5e085b3..64d3cc85 100644 --- a/sunshine/rtsp.h +++ b/sunshine/rtsp.h @@ -21,7 +21,7 @@ struct launch_session_t { void launch_session_raise(launch_session_t launch_session); int session_count(); -void rtpThread(std::shared_ptr shutdown_event); +void rtpThread(); } // namespace stream diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index b025e7e9..cf0e598f 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -153,6 +153,7 @@ struct broadcast_ctx_t { video::packet_queue_t video_packets; audio::packet_queue_t audio_packets; + std::shared_ptr> broadcast_shutdown_event; message_queue_queue_t message_queue_queue; std::thread recv_thread; @@ -207,7 +208,6 @@ void end_broadcast(broadcast_ctx_t &ctx); static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); -safe::signal_t broadcast_shutdown_event; session_t *control_server_t::get_session(const net::peer_t peer) { TUPLE_2D(port, addr_string, platf::from_sockaddr_ex((sockaddr *)&peer->address.address)); @@ -594,7 +594,7 @@ void recvThread(broadcast_ctx_t &ctx) { video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]); audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]); - while(!broadcast_shutdown_event.peek()) { + while(!ctx.broadcast_shutdown_event->peek()) { io.run(); } } @@ -722,6 +722,8 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud } int start_broadcast(broadcast_ctx_t &ctx) { + ctx.broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); + if(ctx.control_server.bind(CONTROL_PORT)) { BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << CONTROL_PORT << "], likely another process already bound to the port"sv; @@ -761,9 +763,9 @@ int start_broadcast(broadcast_ctx_t &ctx) { ctx.audio_packets = std::make_shared(30); ctx.message_queue_queue = std::make_shared(30); - ctx.video_thread = std::thread { videoBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.video_sock), ctx.video_packets }; - ctx.audio_thread = std::thread { audioBroadcastThread, &broadcast_shutdown_event, std::ref(ctx.audio_sock), ctx.audio_packets }; - ctx.control_thread = std::thread { controlBroadcastThread, &broadcast_shutdown_event, &ctx.control_server }; + ctx.video_thread = std::thread { videoBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.video_sock), ctx.video_packets }; + ctx.audio_thread = std::thread { audioBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.audio_sock), ctx.audio_packets }; + ctx.control_thread = std::thread { controlBroadcastThread, ctx.broadcast_shutdown_event.get(), &ctx.control_server }; ctx.recv_thread = std::thread { recvThread, std::ref(ctx) }; @@ -771,7 +773,7 @@ int start_broadcast(broadcast_ctx_t &ctx) { } void end_broadcast(broadcast_ctx_t &ctx) { - broadcast_shutdown_event.raise(true); + ctx.broadcast_shutdown_event->raise(true); // Minimize delay stopping video/audio threads ctx.video_packets->stop(); @@ -796,7 +798,7 @@ void end_broadcast(broadcast_ctx_t &ctx) { ctx.control_thread.join(); BOOST_LOG(debug) << "All broadcasting threads ended"sv; - broadcast_shutdown_event.reset(); + ctx.broadcast_shutdown_event->reset(); } int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) { diff --git a/sunshine/stream.h b/sunshine/stream.h index 9c8dc5ae..ed526ce8 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -35,8 +35,6 @@ void stop(session_t &session); void join(session_t &session); state_e state(session_t &session); } // namespace session - -extern safe::signal_t broadcast_shutdown_event; } // namespace stream #endif //SUNSHINE_STREAM_H diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index 8918a676..f36d4182 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -426,6 +427,83 @@ auto make_shared(F_Construct &&fc, F_Destruct &&fd) { } using signal_t = event_t; + +class mail_raw_t; +using mail_t = std::shared_ptr; + +void cleanup(mail_raw_t *); +template +class post_t : public T { +public: + template + post_t(mail_t mail, Args &&...args) : T(std::forward(args)...), mail { std::move(mail) } {} + + mail_t mail; + + ~post_t() { + cleanup(mail.get()); + } +}; + +template +inline auto lock(const std::weak_ptr &wp) { + return std::reinterpret_pointer_cast>(wp.lock()); +} + +class mail_raw_t : public std::enable_shared_from_this { +public: + template + std::shared_ptr>> event(const std::string_view &id) { + std::lock_guard lg { mutex }; + + auto it = id_to_post.find(id); + if(it != std::end(id_to_post)) { + return lock>(it->second); + } + + auto post = std::make_shared>>(shared_from_this()); + id_to_post.emplace(std::pair> { std::string { id }, post }); + + return post; + } + + template + std::shared_ptr>> queue(const std::string_view &id) { + std::lock_guard lg { mutex }; + + auto it = id_to_post.find(id); + if(it != std::end(id_to_post)) { + return lock>(it->second); + } + + auto post = std::make_shared>>(shared_from_this(), 32); + id_to_post.emplace(std::pair> { std::string { id }, post }); + + return post; + } + + void cleanup() { + std::lock_guard lg { mutex }; + + for(auto it = std::begin(id_to_post); it != std::end(id_to_post); ++it) { + auto &weak = it->second; + + if(weak.expired()) { + id_to_post.erase(it); + + return; + } + } + } + + std::mutex mutex; + + std::map, std::less<>> id_to_post; +}; + +inline void cleanup(mail_raw_t *mail) { + mail->cleanup(); +} } // namespace safe #endif //SUNSHINE_THREAD_SAFE_H