From 5cd0fd76bf9932cc403fc720c4675d30fb3370bd Mon Sep 17 00:00:00 2001 From: loki Date: Sat, 8 Feb 2020 16:26:38 +0100 Subject: [PATCH] Compile for Multicasting --- CMakeLists.txt | 2 + assets/sunshine.conf | 8 + sunshine/audio.cpp | 8 +- sunshine/audio.h | 4 +- sunshine/config.cpp | 16 +- sunshine/config.h | 3 + sunshine/main.cpp | 8 +- sunshine/network.cpp | 19 + sunshine/network.h | 13 + sunshine/nvhttp.cpp | 46 +- sunshine/platform/common.h | 3 + sunshine/platform/windows.cpp | 10 +- sunshine/process.cpp | 3 + sunshine/process.h | 2 + sunshine/rtsp.cpp | 488 +++++++++++++++++ sunshine/rtsp.h | 25 + sunshine/stream.cpp | 964 ++++++++++------------------------ sunshine/stream.h | 80 ++- sunshine/thread_safe.h | 143 +++++ sunshine/video.cpp | 207 +++++--- sunshine/video.h | 31 +- 21 files changed, 1259 insertions(+), 824 deletions(-) create mode 100644 sunshine/rtsp.cpp create mode 100644 sunshine/rtsp.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1adcf1af..6eecca90 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,6 +128,8 @@ set(SUNSHINE_TARGET_FILES sunshine/crypto.h sunshine/nvhttp.cpp sunshine/nvhttp.h + sunshine/rtsp.cpp + sunshine/rtsp.h sunshine/stream.cpp sunshine/stream.h sunshine/video.cpp diff --git a/assets/sunshine.conf b/assets/sunshine.conf index 1501bb98..3831ff73 100644 --- a/assets/sunshine.conf +++ b/assets/sunshine.conf @@ -50,6 +50,14 @@ # The value must be greater than 0 and lower than or equal to 100 # fec_percentage = 10 +# When multicasting, it could be usefull to have different configurations for each connected Client. +# For example: +# Clients connected through WAN and LAN have different bitrate contstraints. +# Decoders may require different settings for color +# +# Unlike simply broadcasting to multiple Client, this will generate distinct video streams. +# Note, CPU usage increases for each distinct video stream generated +# channels = 1 # The back/select button on the controller # On the Shield, the home and powerbutton are not passed to Moonlight diff --git a/sunshine/audio.cpp b/sunshine/audio.cpp index 73fc1160..e577a01f 100644 --- a/sunshine/audio.cpp +++ b/sunshine/audio.cpp @@ -50,7 +50,7 @@ static opus_stream_config_t HighSurround51 = { map_high_surround51 }; -void encodeThread(std::shared_ptr> packets, sample_queue_t samples, config_t config) { +void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t config, void *channel_data) { //FIXME: Pick correct opus_stream_config_t based on config.channels auto stream = &stereo; opus_t opus { opus_multistream_encoder_create( @@ -76,13 +76,13 @@ void encodeThread(std::shared_ptr> packets, sample_queue } packet.fake_resize(bytes); - packets->raise(std::move(packet)); + packets->raise(std::make_pair(channel_data, std::move(packet))); } } -void capture(std::shared_ptr> packets, config_t config) { +void capture(packet_queue_t packets, config_t config, void *channel_data) { auto samples = std::make_shared(); - std::thread thread { encodeThread, packets, samples, config }; + std::thread thread { encodeThread, packets, samples, config, channel_data }; auto fg = util::fail_guard([&]() { packets->stop(); diff --git a/sunshine/audio.h b/sunshine/audio.h index 9b181fdf..1628d029 100644 --- a/sunshine/audio.h +++ b/sunshine/audio.h @@ -11,8 +11,8 @@ struct config_t { }; using packet_t = util::buffer_t; -using packet_queue_t = std::shared_ptr>; -void capture(std::shared_ptr> packets, config_t config); +using packet_queue_t = std::shared_ptr>>; +void capture(packet_queue_t packets, config_t config, void *channel_data); } #endif diff --git a/sunshine/config.cpp b/sunshine/config.cpp index a0ca7cdf..5a8f6ff5 100644 --- a/sunshine/config.cpp +++ b/sunshine/config.cpp @@ -35,7 +35,8 @@ stream_t stream { APPS_JSON_PATH, - 10 // fecPercentage + 10, // fecPercentage + 1 // channels }; nvhttp_t nvhttp { @@ -184,10 +185,15 @@ void parse_file(const char *file) { }); int to = -1; - int_f(vars, "ping_timeout", to); - if(to > 0) { - stream.ping_timeout = std::chrono::milliseconds(to); - } + int_between_f(vars, "ping_timeout", to, { + -1, std::numeric_limits::max() + }); + stream.ping_timeout = std::chrono::milliseconds(to); + + int_between_f(vars, "channels", stream.channels, { + 1, std::numeric_limits::max() + }); + string_f(vars, "file_apps", stream.file_apps); int_between_f(vars, "fec_percentage", stream.fec_percentage, { 1, 100 diff --git a/sunshine/config.h b/sunshine/config.h index b27d3e2a..9185b9ef 100644 --- a/sunshine/config.h +++ b/sunshine/config.h @@ -30,6 +30,9 @@ struct stream_t { std::string file_apps; int fec_percentage; + + // max unique instances of video and audio streams + int channels; }; struct nvhttp_t { diff --git a/sunshine/main.cpp b/sunshine/main.cpp index 034bc6de..78a79748 100644 --- a/sunshine/main.cpp +++ b/sunshine/main.cpp @@ -15,7 +15,7 @@ #include #include "nvhttp.h" -#include "stream.h" +#include "rtsp.h" #include "config.h" #include "thread_pool.h" @@ -124,6 +124,12 @@ int main(int argc, char *argv[]) { return 7; } + { + proc::ctx_t ctx; + ctx.name = "Desktop"s; + proc_opt->get_apps().emplace(std::begin(proc_opt->get_apps()), std::move(ctx)); + } + proc::proc = std::move(*proc_opt); auto deinit_guard = platf::init(); diff --git a/sunshine/network.cpp b/sunshine/network.cpp index f49c9af8..029b1c76 100644 --- a/sunshine/network.cpp +++ b/sunshine/network.cpp @@ -93,4 +93,23 @@ std::string_view to_enum_string(net_e net) { // avoid warning return "wan"sv; } + +host_t host_create(ENetAddress &addr, std::size_t peers, std::uint16_t port) { + enet_address_set_host(&addr, "0.0.0.0"); + enet_address_set_port(&addr, port); + + return host_t { enet_host_create(PF_INET, &addr, peers, 1, 0, 0) }; +} + +void free_host(ENetHost *host) { + std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) { + ENetPeer *peer = &peer_ref; + + if(peer) { + enet_peer_disconnect_now(peer, 0); + } + }); + + enet_host_destroy(host); +} } \ No newline at end of file diff --git a/sunshine/network.h b/sunshine/network.h index 8ddb8aef..b3e65a97 100644 --- a/sunshine/network.h +++ b/sunshine/network.h @@ -6,7 +6,18 @@ #define SUNSHINE_NETWORK_H #include + +#include + +#include "utility.h" + namespace net { +void free_host(ENetHost *host); + +using host_t = util::safe_ptr; +using peer_t = ENetPeer*; +using packet_t = util::safe_ptr; + enum net_e : int { PC, LAN, @@ -17,6 +28,8 @@ net_e from_enum_string(const std::string_view &view); std::string_view to_enum_string(net_e net); net_e from_address(const std::string_view &view); + +host_t host_create(ENetAddress &addr, std::size_t peers, std::uint16_t port); } #endif //SUNSHINE_NETWORK_H diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index cad3d516..3807a6fd 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -18,7 +18,8 @@ #include "config.h" #include "utility.h" -#include "stream.h" +#include "rtsp.h" +#include "crypto.h" #include "nvhttp.h" #include "platform/common.h" #include "network.h" @@ -504,17 +505,14 @@ void applist(resp_https_t response, req_https_t request) { pt::ptree desktop; apps.put(".status_code", 200); - desktop.put("IsHdrSupported"s, config::video.hevc_mode == 2 ? 1 : 0); - desktop.put("AppTitle"s, "Desktop"); - desktop.put("ID"s, 1); - int x = 2; + int x = 0; for(auto &proc : proc::proc.get_apps()) { pt::ptree app; app.put("IsHdrSupported"s, config::video.hevc_mode == 2 ? 1 : 0); app.put("AppTitle"s, proc.name); - app.put("ID"s, x++); + app.put("ID"s, ++x); apps.push_back(std::make_pair("App", std::move(app))); } @@ -536,17 +534,15 @@ void launch(resp_https_t response, req_https_t request) { auto args = request->parse_query_string(); auto appid = util::from_view(args.at("appid")) -2; - stream::launch_session_t launch_session; - - if(stream::session_state != stream::state_e::STOPPED) { - tree.put("root..status_code", 503); - tree.put("root.gamesession", 0); + auto current_appid = proc::proc.running(); + if(current_appid != -1) { + tree.put("root.resume", 0); + tree.put("root..status_code", 400); return; } - auto current_appid = proc::proc.running(); - if(appid >= 0 && appid != current_appid) { + if(appid >= 0) { auto err = proc::proc.execute(appid); if(err) { tree.put("root..status_code", err); @@ -554,12 +550,9 @@ void launch(resp_https_t response, req_https_t request) { return; } - - current_appid = appid; } - // Needed to determine if session must be closed when no process is running in proc::proc - launch_session.has_process = current_appid >= 0; + stream::launch_session_t launch_session; auto clientID = args.at("uniqueid"s); launch_session.gcm_key = *util::from_hex(args.at("rikey"s), true); @@ -571,14 +564,6 @@ void launch(resp_https_t response, req_https_t request) { stream::launch_event.raise(launch_session); -/* - bool sops = args.at("sops"s) == "1"; - std::optional gcmap { std::nullopt }; - if(auto it = args.find("gcmap"s); it != std::end(args)) { - gcmap = std::stoi(it->second); - } -*/ - tree.put("root..status_code", 200); tree.put("root.gamesession", 1); } @@ -595,7 +580,7 @@ void resume(resp_https_t response, req_https_t request) { }); auto current_appid = proc::proc.running(); - if(current_appid == -1 || stream::session_state != stream::state_e::STOPPED) { + if(current_appid == -1) { tree.put("root.resume", 0); tree.put("root..status_code", 503); @@ -603,8 +588,6 @@ void resume(resp_https_t response, req_https_t request) { } stream::launch_session_t launch_session; - // Needed to determine if session must be closed when no process is running in proc::proc - launch_session.has_process = current_appid >= 0; auto args = request->parse_query_string(); auto clientID = args.at("uniqueid"s); @@ -639,13 +622,6 @@ void cancel(resp_https_t response, req_https_t request) { return; } - if(stream::session_state != stream::state_e::STOPPED) { - tree.put("root..status_code", 503); - tree.put("root.cancel", 0); - - return; - } - proc::proc.terminate(); tree.put("root.cancel", 1); diff --git a/sunshine/platform/common.h b/sunshine/platform/common.h index 184c0d38..01e659e4 100644 --- a/sunshine/platform/common.h +++ b/sunshine/platform/common.h @@ -8,6 +8,7 @@ #include #include "sunshine/utility.h" +struct sockaddr; namespace platf { constexpr auto MAX_GAMEPADS = 2; @@ -86,6 +87,8 @@ using input_t = util::safe_ptr; std::string get_mac_address(const std::string_view &address); +std::string from_sockaddr(const sockaddr *const); + std::unique_ptr microphone(std::uint32_t sample_rate); std::unique_ptr display(); diff --git a/sunshine/platform/windows.cpp b/sunshine/platform/windows.cpp index 2d89ddc6..23eeaa55 100755 --- a/sunshine/platform/windows.cpp +++ b/sunshine/platform/windows.cpp @@ -86,16 +86,16 @@ public: client_t client; }; -std::string from_socket_address(const SOCKET_ADDRESS &socket_address) { +std::string from_sockaddr(const sockaddr *const socket_address) { char data[INET6_ADDRSTRLEN]; - auto family = socket_address.lpSockaddr->sa_family; + auto family = socket_address->sa_family; if(family == AF_INET6) { - inet_ntop(AF_INET6, &((sockaddr_in6*)socket_address.lpSockaddr)->sin6_addr, data, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &((sockaddr_in6*)socket_address)->sin6_addr, data, INET6_ADDRSTRLEN); } if(family == AF_INET) { - inet_ntop(AF_INET, &((sockaddr_in*)socket_address.lpSockaddr)->sin_addr, data, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &((sockaddr_in*)socket_address)->sin_addr, data, INET_ADDRSTRLEN); } return std::string { data }; @@ -116,7 +116,7 @@ std::string get_mac_address(const std::string_view &address) { adapteraddrs_t info = get_adapteraddrs(); for(auto adapter_pos = info.get(); adapter_pos != nullptr; adapter_pos = adapter_pos->Next) { for(auto addr_pos = adapter_pos->FirstUnicastAddress; addr_pos != nullptr; addr_pos = addr_pos->Next) { - if(adapter_pos->PhysicalAddressLength != 0 && address == from_socket_address(addr_pos->Address)) { + if(adapter_pos->PhysicalAddressLength != 0 && address == from_sockaddr(addr_pos->Address.lpSockaddr)) { std::stringstream mac_addr; mac_addr << std::hex; for(int i = 0; i < adapter_pos->PhysicalAddressLength; i++) { diff --git a/sunshine/process.cpp b/sunshine/process.cpp index c4058614..fe39a765 100644 --- a/sunshine/process.cpp +++ b/sunshine/process.cpp @@ -163,6 +163,9 @@ void proc_t::terminate() { const std::vector &proc_t::get_apps() const { return _apps; } +std::vector &proc_t::get_apps() { + return _apps; +} proc_t::~proc_t() { terminate(); diff --git a/sunshine/process.h b/sunshine/process.h index ad42ce2f..37c17acd 100644 --- a/sunshine/process.h +++ b/sunshine/process.h @@ -67,6 +67,8 @@ public: ~proc_t(); const std::vector &get_apps() const; + std::vector &get_apps(); + void terminate(); private: diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp new file mode 100644 index 00000000..98135563 --- /dev/null +++ b/sunshine/rtsp.cpp @@ -0,0 +1,488 @@ +// +// Created by loki on 2/2/20. +// + +extern "C" { +#include +} + +#include "config.h" +#include "main.h" +#include "network.h" +#include "rtsp.h" +#include "input.h" +#include "stream.h" + +namespace asio = boost::asio; + +using asio::ip::tcp; +using asio::ip::udp; + +using namespace std::literals; + +namespace stream { +constexpr auto RTSP_SETUP_PORT = 48010; + +void free_msg(PRTSP_MESSAGE msg) { + freeMessage(msg); + + delete msg; +} + +class rtsp_server_t; + +using msg_t = util::safe_ptr; +using cmd_func_t = std::function &, net::peer_t, msg_t&&)>; + +safe::event_t launch_event; + +void print_msg(PRTSP_MESSAGE msg); +void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req); + +class rtsp_server_t { +public: + rtsp_server_t(rtsp_server_t &&) noexcept = default; + rtsp_server_t &operator=(rtsp_server_t &&) noexcept = default; + + explicit rtsp_server_t(std::uint16_t port) : _session_slots (config::stream.channels), _host {net::host_create(_addr, 1, port) } {} + + template + void iterate(std::chrono::duration timeout) { + ENetEvent event; + auto res = enet_host_service(_host.get(), &event, std::chrono::floor(timeout).count()); + + if (res > 0) { + switch (event.type) { + case ENET_EVENT_TYPE_RECEIVE: { + net::packet_t packet{event.packet}; + net::peer_t peer{event.peer}; + + msg_t req { new msg_t::element_type }; + + //TODO: compare addresses of the peers + if (_queue_packet.second == nullptr) { + parseRtspMessage(req.get(), (char *) packet->data, packet->dataLength); + for (auto option = req->options; option != nullptr; option = option->next) { + if ("Content-length"sv == option->option) { + _queue_packet = std::make_pair(peer, std::move(packet)); + return; + } + } + } + else { + std::vector full_payload; + + auto old_msg = std::move(_queue_packet); + TUPLE_2D_REF(_, old_packet, old_msg); + + std::string_view new_payload{(char *) packet->data, packet->dataLength}; + std::string_view old_payload{(char *) old_packet->data, old_packet->dataLength}; + full_payload.resize(new_payload.size() + old_payload.size()); + + std::copy(std::begin(old_payload), std::end(old_payload), std::begin(full_payload)); + std::copy(std::begin(new_payload), std::end(new_payload), std::begin(full_payload) + old_payload.size()); + + parseRtspMessage(req.get(), full_payload.data(), full_payload.size()); + } + + print_msg(req.get()); + + msg_t resp; + auto func = _map_cmd_cb.find(req->message.request.command); + if (func != std::end(_map_cmd_cb)) { + func->second(this, nullptr, peer, std::move(req)); + } + else { + cmd_not_found(host(), peer, std::move(req)); + } + + return; + } + case ENET_EVENT_TYPE_CONNECT: + BOOST_LOG(info) << "CLIENT CONNECTED TO RTSP"sv; + break; + case ENET_EVENT_TYPE_DISCONNECT: + BOOST_LOG(info) << "CLIENT DISCONNECTED FROM RTSP"sv; + break; + case ENET_EVENT_TYPE_NONE: + break; + } + } + } + + void map(const std::string_view &type, cmd_func_t cb) { + _map_cmd_cb.emplace(type, std::move(cb)); + } + + void stop() { + for(auto &slot : _session_slots) { + auto session = slot.lock(); + + if (!session) { + continue; + } + + // Wait until the session is properly running + while (session->state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + + ::stream::stop(*session); + + BOOST_LOG(debug) << "Waiting for Audio to end..."sv; + session->audioThread.join(); + BOOST_LOG(debug) << "Waiting for Video to end..."sv; + session->videoThread.join(); + + input::reset(input); + } + } + + bool accept(const std::shared_ptr &session) { + for(auto &slot : _session_slots) { + if(slot.expired()) { + slot = session; + + return true; + } + } + + return false; + } + + net::host_t::pointer host() const { + return _host.get(); + } +private: + + // named _queue_packet because I want to make it an actual queue + // It's like this for convenience sake + std::pair _queue_packet; + + std::unordered_map _map_cmd_cb; + + std::vector> _session_slots; + + ENetAddress _addr; + net::host_t _host; +}; + +void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) { + auto payload = std::make_pair(resp->payload, resp->payloadLength); + + auto lg = util::fail_guard([&]() { + resp->payload = payload.first; + resp->payloadLength = payload.second; + }); + + resp->payload = nullptr; + resp->payloadLength = 0; + + int serialized_len; + util::c_ptr raw_resp { serializeRtspMessage(resp.get(), &serialized_len) }; + BOOST_LOG(debug) + << "---Begin Response---"sv << std::endl + << std::string_view { raw_resp.get(), (std::size_t)serialized_len } << std::endl + << std::string_view { payload.first, (std::size_t)payload.second } << std::endl + << "---End Response---"sv << std::endl; + + std::string_view tmp_resp { raw_resp.get(), (size_t)serialized_len }; + { + auto packet = enet_packet_create(tmp_resp.data(), tmp_resp.size(), ENET_PACKET_FLAG_RELIABLE); + if(enet_peer_send(peer, 0, packet)) { + enet_packet_destroy(packet); + return; + } + + enet_host_flush(host); + } + + if(payload.second > 0) { + auto packet = enet_packet_create(payload.first, payload.second, ENET_PACKET_FLAG_RELIABLE); + if(enet_peer_send(peer, 0, packet)) { + enet_packet_destroy(packet); + return; + } + + enet_host_flush(host); + } +} + +void respond(net::host_t::pointer host, net::peer_t peer, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { + msg_t resp { new msg_t::element_type }; + createRtspResponse(resp.get(), nullptr, 0, const_cast("RTSP/1.0"), statuscode, const_cast(status_msg), seqn, options, const_cast(payload.data()), (int)payload.size()); + + respond(host, peer, resp); +} + +void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req) { + respond(host, peer, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); +} + +void cmd_option(rtsp_server_t *server, const std::shared_ptr &session, net::peer_t peer, msg_t&& req) { + OPTION_ITEM option {}; + + // I know these string literals will not be modified + option.option = const_cast("CSeq"); + + auto seqn_str = std::to_string(req->sequenceNumber); + option.content = const_cast(seqn_str.c_str()); + + respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); +} + +void cmd_describe(rtsp_server_t *server, const std::shared_ptr &session, net::peer_t peer, msg_t&& req) { + OPTION_ITEM option {}; + + // I know these string literals will not be modified + option.option = const_cast("CSeq"); + + auto seqn_str = std::to_string(req->sequenceNumber); + option.content = const_cast(seqn_str.c_str()); + + std::string_view payload; + if(config::video.hevc_mode == 0) { + payload = "surround-params=NONE"sv; + } + else { + payload = "sprop-parameter-sets=AAAAAU;surround-params=NONE"sv; + } + + respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, payload); +} + +void cmd_setup(rtsp_server_t *server, const std::shared_ptr &session, net::peer_t peer, msg_t &&req) { + OPTION_ITEM options[2] {}; + + auto &seqn = options[0]; + auto &session_option = options[1]; + + seqn.option = const_cast("CSeq"); + + auto seqn_str = std::to_string(req->sequenceNumber); + seqn.content = const_cast(seqn_str.c_str()); + + if(session->idr_events) { + // already streaming + + respond(server->host(), peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {}); + return; + } + + std::string_view target { req->message.request.target }; + auto begin = std::find(std::begin(target), std::end(target), '=') + 1; + auto end = std::find(begin, std::end(target), '/'); + std::string_view type { begin, (size_t)std::distance(begin, end) }; + + if(type == "audio"sv) { + seqn.next = &session_option; + + session_option.option = const_cast("Session"); + session_option.content = const_cast("DEADBEEFCAFE;timeout = 90"); + } + else if(type != "video"sv && type != "control"sv) { + cmd_not_found(server->host(), peer, std::move(req)); + + return; + } + + respond(server->host(), peer, &seqn, 200, "OK", req->sequenceNumber, {}); +} + +void cmd_announce(rtsp_server_t *server, const std::shared_ptr &session, net::peer_t peer, msg_t &&req) { + OPTION_ITEM option {}; + + // I know these string literals will not be modified + option.option = const_cast("CSeq"); + + auto seqn_str = std::to_string(req->sequenceNumber); + option.content = const_cast(seqn_str.c_str()); + + auto expected_state = state_e::STOPPED; + auto abort = session->state.compare_exchange_strong(expected_state, state_e::STARTING); + + if(abort || !launch_event.peek()) { + //Either already streaming or /launch has not been used + + if(!abort) { + session->state.store(state_e::STOPPED); + } + + respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); + return; + } + auto launch_session { launch_event.pop() }; + + std::string_view payload { req->payload, (size_t)req->payloadLength }; + + std::vector lines; + + auto whitespace = [](char ch) { + return ch == '\n' || ch == '\r'; + }; + + { + auto pos = std::begin(payload); + auto begin = pos; + while (pos != std::end(payload)) { + if (whitespace(*pos++)) { + lines.emplace_back(begin, pos - begin - 1); + + while(pos != std::end(payload) && whitespace(*pos)) { ++pos; } + begin = pos; + } + } + } + + std::string_view client; + std::unordered_map args; + + for(auto line : lines) { + auto type = line.substr(0, 2); + if(type == "s="sv) { + client = line.substr(2); + } + else if(type == "a=") { + auto pos = line.find(':'); + + auto name = line.substr(2, pos - 2); + auto val = line.substr(pos + 1); + + if(val[val.size() -1] == ' ') { + val = val.substr(0, val.size() -1); + } + args.emplace(name, val); + } + } + + // Initialize any omitted parameters to defaults + args.try_emplace("x-nv-video[0].encoderCscMode"sv, "0"sv); + args.try_emplace("x-nv-vqos[0].bitStreamFormat"sv, "0"sv); + args.try_emplace("x-nv-video[0].dynamicRangeMode"sv, "0"sv); + args.try_emplace("x-nv-aqos.packetDuration"sv, "5"sv); + + try { + + auto &config = session->config; + config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv)); + config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv)); + config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv)); + + config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv)); + + config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv)); + config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv)); + config.monitor.framerate = util::from_view(args.at("x-nv-video[0].maxFPS"sv)); + config.monitor.bitrate = util::from_view(args.at("x-nv-vqos[0].bw.maximumBitrateKbps"sv)); + config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv)); + config.monitor.numRefFrames = util::from_view(args.at("x-nv-video[0].maxNumReferenceFrames"sv)); + config.monitor.encoderCscMode = util::from_view(args.at("x-nv-video[0].encoderCscMode"sv)); + config.monitor.videoFormat = util::from_view(args.at("x-nv-vqos[0].bitStreamFormat"sv)); + config.monitor.dynamicRange = util::from_view(args.at("x-nv-video[0].dynamicRangeMode"sv)); + + } catch(std::out_of_range &) { + + respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + return; + } + + if(session->config.monitor.videoFormat != 0 && config::video.hevc_mode == 0) { + BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv; + + respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + return; + } + + auto &gcm_key = launch_session->gcm_key; + auto &iv = launch_session->iv; + + std::copy(std::begin(gcm_key), std::end(gcm_key), std::begin(session->gcm_key)); + std::copy(std::begin(iv), std::end(iv), std::begin(session->iv)); + + session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; + + session->idr_events = std::make_shared(); + + session->audioThread = std::thread {audioThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)}; + session->videoThread = std::thread {videoThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)}; + + session->state.store(state_e::RUNNING); + respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); +} + +void cmd_play(rtsp_server_t *server, const std::shared_ptr &session, net::peer_t peer, msg_t &&req) { + OPTION_ITEM option {}; + + // I know these string literals will not be modified + option.option = const_cast("CSeq"); + + auto seqn_str = std::to_string(req->sequenceNumber); + option.content = const_cast(seqn_str.c_str()); + + respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); +} + +void rtpThread(std::shared_ptr> shutdown_event) { + input = std::make_shared(); + auto fg = util::fail_guard([&]() { + input.reset(); + }); + + rtsp_server_t server(RTSP_SETUP_PORT); + + server.map("OPTIONS"sv, &cmd_option); + server.map("DESCRIBE"sv, &cmd_describe); + server.map("SETUP"sv, &cmd_setup); + server.map("ANNOUNCE"sv, &cmd_announce); + + server.map("PLAY"sv, &cmd_play); + + while(!shutdown_event->peek()) { + server.iterate(std::min(500ms, config::stream.ping_timeout)); + } + + server.stop(); +} + +void print_msg(PRTSP_MESSAGE msg) { + std::string_view type = msg->type == TYPE_RESPONSE ? "RESPONSE"sv : "REQUEST"sv; + + std::string_view payload { msg->payload, (size_t)msg->payloadLength }; + std::string_view protocol { msg->protocol }; + auto seqnm = msg->sequenceNumber; + std::string_view messageBuffer { msg->messageBuffer }; + + BOOST_LOG(debug) << "type ["sv << type << ']'; + BOOST_LOG(debug) << "sequence number ["sv << seqnm << ']'; + BOOST_LOG(debug) << "protocol :: "sv << protocol; + BOOST_LOG(debug) << "payload :: "sv << payload; + + if(msg->type == TYPE_RESPONSE) { + auto &resp = msg->message.response; + + auto statuscode = resp.statusCode; + std::string_view status { resp.statusString }; + + BOOST_LOG(debug) << "statuscode :: "sv << statuscode; + BOOST_LOG(debug) << "status :: "sv << status; + } + else { + auto& req = msg->message.request; + + std::string_view command { req.command }; + std::string_view target { req.target }; + + BOOST_LOG(debug) << "command :: "sv << command; + BOOST_LOG(debug) << "target :: "sv << target; + } + + for(auto option = msg->options; option != nullptr; option = option->next) { + std::string_view content { option->content }; + std::string_view name { option->option }; + + BOOST_LOG(debug) << name << " :: "sv << content; + } + + BOOST_LOG(debug) << "---Begin MessageBuffer---"sv << std::endl << messageBuffer << std::endl << "---End MessageBuffer---"sv << std::endl; +} +} \ No newline at end of file diff --git a/sunshine/rtsp.h b/sunshine/rtsp.h new file mode 100644 index 00000000..f94ed36b --- /dev/null +++ b/sunshine/rtsp.h @@ -0,0 +1,25 @@ +// +// Created by loki on 2/2/20. +// + +#ifndef SUNSHINE_RTSP_H +#define SUNSHINE_RTSP_H + +#include + +#include "crypto.h" +#include "thread_safe.h" + +namespace stream { +struct launch_session_t { + crypto::aes_t gcm_key; + crypto::aes_t iv; +}; + +extern safe::event_t launch_event; + +void rtpThread(std::shared_ptr> shutdown_event); + +} + +#endif //SUNSHINE_RTSP_H diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 4d79ba27..efde8b13 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -4,35 +4,22 @@ #include "process.h" -#include -#if ((BOOST_VERSION / 1000) >= 107) -#define EXECUTOR(x) (x->get_executor()) -#else -#define EXECUTOR(x) (x->get_io_service()) -#endif - - #include #include -#include -#include + #include #include extern "C" { -#include #include -#include #include } +#include "network.h" #include "config.h" #include "utility.h" #include "stream.h" -#include "audio.h" -#include "video.h" #include "thread_safe.h" -#include "crypto.h" #include "input.h" #include "main.h" @@ -56,21 +43,10 @@ static const short packetTypes[] = { 0x0100, // Termination }; -namespace asio = boost::asio; -namespace sys = boost::system; - -using asio::ip::tcp; -using asio::ip::udp; - using namespace std::literals; namespace stream { -constexpr auto RTSP_SETUP_PORT = 48010; -constexpr auto VIDEO_STREAM_PORT = 47998; -constexpr auto CONTROL_PORT = 47999; -constexpr auto AUDIO_STREAM_PORT = 48000; - #pragma pack(push, 1) struct video_packet_raw_t { @@ -92,174 +68,31 @@ struct audio_packet_raw_t { #pragma pack(pop) -safe::event_t launch_event; +using rh_t = util::safe_ptr; +using video_packet_t = util::c_ptr; +using audio_packet_t = util::c_ptr; + +using session_queue_t = std::shared_ptr>>>; + +int start_broadcast(broadcast_ctx_t &ctx); +void end_broadcast(broadcast_ctx_t &ctx); std::shared_ptr input; - -struct config_t { - audio::config_t audio; - video::config_t monitor; - int packetsize; - - bool sops; - std::optional gcmap; -}; - -struct session_t { - config_t config; - - std::thread audioThread; - std::thread videoThread; - std::thread controlThread; - - std::chrono::steady_clock::time_point pingTimeout; - - video::packet_queue_t video_packets; - audio::packet_queue_t audio_packets; - - crypto::aes_t gcm_key; - crypto::aes_t iv; - - bool has_process; -} session; -std::atomic session_state; - -void free_host(ENetHost *host) { - std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) { - ENetPeer *peer = &peer_ref; - - if(peer) { - enet_peer_disconnect_now(peer, 0); - } - }); - - enet_host_destroy(host); -} - -void free_msg(PRTSP_MESSAGE msg) { - freeMessage(msg); - - delete msg; -} - -using msg_t = util::safe_ptr; -using packet_t = util::safe_ptr; -using host_t = util::safe_ptr; -using peer_t = ENetPeer*; -using rh_t = util::safe_ptr; -using video_packet_t = util::safe_ptr; -using audio_packet_t = util::safe_ptr; - -host_t host_create(ENetAddress &addr, std::uint16_t port) { - enet_address_set_host(&addr, "0.0.0.0"); - enet_address_set_port(&addr, port); - - return host_t { enet_host_create(PF_INET, &addr, 1, 1, 0, 0) }; -} - -void print_msg(PRTSP_MESSAGE msg); -void cmd_not_found(host_t &host, peer_t peer, msg_t&& req); +static auto broadcast = safe::make_shared(start_broadcast, end_broadcast); void stop(session_t &session) { - session.video_packets->stop(); - session.audio_packets->stop(); + session.idr_events->stop(); auto expected = state_e::RUNNING; - session_state.compare_exchange_strong(expected, state_e::STOPPING); + session.state.compare_exchange_strong(expected, state_e::STOPPING); } -class rtsp_server_t { -public: - rtsp_server_t(rtsp_server_t &&) noexcept = default; - rtsp_server_t &operator=(rtsp_server_t &&) noexcept = default; - - explicit rtsp_server_t(std::uint16_t port) : _host { host_create(_addr, port) } {} - - template - void iterate(std::chrono::duration timeout) { - ENetEvent event; - auto res = enet_host_service(_host.get(), &event, std::chrono::floor(timeout).count()); - - if(res > 0) { - switch(event.type) { - case ENET_EVENT_TYPE_RECEIVE: - { - packet_t packet { event.packet }; - peer_t peer { event.peer }; - - msg_t req { new RTSP_MESSAGE {} }; - - //TODO: compare addresses of the peers - if(_queue_packet.second == nullptr) { - parseRtspMessage(req.get(), (char*)packet->data, packet->dataLength); - for(auto option = req->options; option != nullptr; option = option->next) { - if("Content-length"sv == option->option) { - _queue_packet = std::make_pair(peer, std::move(packet)); - return; - } - } - } - else { - std::vector full_payload; - - auto old_msg = std::move(_queue_packet); - TUPLE_2D_REF(_, old_packet, old_msg); - - std::string_view new_payload { (char*)packet->data, packet->dataLength }; - std::string_view old_payload { (char*)old_packet->data, old_packet->dataLength }; - full_payload.resize(new_payload.size() + old_payload.size()); - - std::copy(std::begin(old_payload), std::end(old_payload), std::begin(full_payload)); - std::copy(std::begin(new_payload), std::end(new_payload), std::begin(full_payload) + old_payload.size()); - - parseRtspMessage(req.get(), full_payload.data(), full_payload.size()); - } - - print_msg(req.get()); - - msg_t resp; - auto func = _map_cmd_cb.find(req->message.request.command); - if(func != std::end(_map_cmd_cb)) { - func->second(_host, peer, std::move(req)); - } - else { - cmd_not_found(_host, peer, std::move(req)); - } - - return; - } - break; - case ENET_EVENT_TYPE_CONNECT: - BOOST_LOG(info) << "CLIENT CONNECTED TO RTSP"sv; - break; - case ENET_EVENT_TYPE_DISCONNECT: - BOOST_LOG(info) << "CLIENT DISCONNECTED FROM RTSP"sv; - break; - case ENET_EVENT_TYPE_NONE: - break; - } - } - } - - void map(const std::string_view &type, std::function cb); -private: - void _respond(peer_t &peer, msg_t &msg); - - // named _queue_packet because I want to make it an actual queue - // It's like this for convenience sake - std::pair _queue_packet; - - std::unordered_map> _map_cmd_cb; - ENetAddress _addr; - host_t _host; -}; - class control_server_t { public: control_server_t(control_server_t &&) noexcept = default; control_server_t &operator=(control_server_t &&) noexcept = default; - explicit control_server_t(std::uint16_t port) : _host { host_create(_addr, port) } {} + explicit control_server_t(std::uint16_t port) : _host { net::host_create(_addr, config::stream.channels, port) } {} template void iterate(std::chrono::duration timeout) { @@ -267,10 +100,37 @@ public: auto res = enet_host_service(_host.get(), &event, std::chrono::floor(timeout).count()); if(res > 0) { + while(session_queue->peek()) { + auto session_opt = session_queue->pop(); + if(!session_opt) { + return; + } + + TUPLE_2D_REF(addr_string, session, *session_opt); + + if(session) { + _map_addr_session.try_emplace(addr_string, session); + } + else { + _map_addr_session.erase(addr_string); + } + } + auto addr_string = platf::from_sockaddr((sockaddr*)&event.peer->address.address); + + auto it = _map_addr_session.find(addr_string); + if(it == std::end(_map_addr_session)) { + BOOST_LOG(warning) << "Rejected connection from ["sv << addr_string << "]: it's not properly set up"sv; + enet_peer_disconnect_now(event.peer, 0); + + return; + } + + auto &session = it->second; + switch(event.type) { case ENET_EVENT_TYPE_RECEIVE: { - packet_t packet { event.packet }; + net::packet_t packet { event.packet }; std::uint16_t *type = (std::uint16_t *)packet->data; std::string_view payload { (char*)packet->data + sizeof(*type), packet->dataLength - sizeof(*type) }; @@ -284,7 +144,7 @@ public: } else { - cb->second(payload); + cb->second(session.get(), payload); } } break; @@ -294,8 +154,8 @@ public: case ENET_EVENT_TYPE_DISCONNECT: BOOST_LOG(info) << "CLIENT DISCONNECTED"sv; // No more clients to send video data to ^_^ - if(session_state == state_e::RUNNING) { - stop(session); + if(session->state == state_e::RUNNING) { + stop(*session); } break; case ENET_EVENT_TYPE_NONE: @@ -303,12 +163,20 @@ public: } } } - void map(uint16_t type, std::function cb); + + void map(uint16_t type, std::function cb) { + _map_type_cb.emplace(type, std::move(cb)); + } + void send(const std::string_view &payload); -private: - std::unordered_map> _map_type_cb; + + std::unordered_map> _map_type_cb; + std::unordered_map> _map_addr_session; + + session_queue_t session_queue; + ENetAddress _addr; - host_t _host; + net::host_t _host; }; namespace fec { @@ -402,48 +270,6 @@ std::vector insert(uint64_t insert_size, uint64_t slice_size, const std return result; } -void print_msg(PRTSP_MESSAGE msg) { - std::string_view type = msg->type == TYPE_RESPONSE ? "RESPONSE"sv : "REQUEST"sv; - - std::string_view payload { msg->payload, (size_t)msg->payloadLength }; - std::string_view protocol { msg->protocol }; - auto seqnm = msg->sequenceNumber; - std::string_view messageBuffer { msg->messageBuffer }; - - BOOST_LOG(debug) << "type ["sv << type << ']'; - BOOST_LOG(debug) << "sequence number ["sv << seqnm << ']'; - BOOST_LOG(debug) << "protocol :: "sv << protocol; - BOOST_LOG(debug) << "payload :: "sv << payload; - - if(msg->type == TYPE_RESPONSE) { - auto &resp = msg->message.response; - - auto statuscode = resp.statusCode; - std::string_view status { resp.statusString }; - - BOOST_LOG(debug) << "statuscode :: "sv << statuscode; - BOOST_LOG(debug) << "status :: "sv << status; - } - else { - auto& req = msg->message.request; - - std::string_view command { req.command }; - std::string_view target { req.target }; - - BOOST_LOG(debug) << "command :: "sv << command; - BOOST_LOG(debug) << "target :: "sv << target; - } - - for(auto option = msg->options; option != nullptr; option = option->next) { - std::string_view content { option->content }; - std::string_view name { option->option }; - - BOOST_LOG(debug) << name << " :: "sv << content; - } - - BOOST_LOG(debug) << "---Begin MessageBuffer---"sv << std::endl << messageBuffer << std::endl << "---End MessageBuffer---"sv << std::endl; -} - std::vector replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) { std::vector replaced; @@ -457,14 +283,6 @@ std::vector replace(const std::string_view &original, const std::string return replaced; } -void rtsp_server_t::map(const std::string_view& cmd, std::function cb) { - _map_cmd_cb.emplace(cmd, std::move(cb)); -} - -void control_server_t::map(uint16_t type, std::function cb) { - _map_type_cb.emplace(type, std::move(cb)); -} - void control_server_t::send(const std::string_view & payload) { std::for_each(_host->peers, _host->peers + _host->peerCount, [payload](auto &peer) { auto packet = enet_packet_create(payload.data(), payload.size(), ENET_PACKET_FLAG_RELIABLE); @@ -476,24 +294,18 @@ void control_server_t::send(const std::string_view & payload) { enet_host_flush(_host.get()); } -void controlThread(video::idr_event_t idr_events) { +void controlBroadcastThread(safe::event_t *shutdown_event) { control_server_t server { CONTROL_PORT }; - server.map(packetTypes[IDX_START_A], [](const std::string_view &payload) { - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - + server.map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_START_A]"sv; }); - server.map(packetTypes[IDX_START_B], [](const std::string_view &payload) { - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - + server.map(packetTypes[IDX_START_B], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_START_B]"sv; }); - server.map(packetTypes[IDX_LOSS_STATS], [](const std::string_view &payload) { - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - + server.map(packetTypes[IDX_LOSS_STATS], [&](session_t *session, const std::string_view &payload) { int32_t *stats = (int32_t*)payload.data(); auto count = stats[0]; std::chrono::milliseconds t { stats[1] }; @@ -509,10 +321,8 @@ void controlThread(video::idr_event_t idr_events) { << "---end stats---"; }); - server.map(packetTypes[IDX_INVALIDATE_REF_FRAMES], [idr_events](const std::string_view &payload) { - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - - std::int64_t *frames = (std::int64_t *) payload.data(); + server.map(packetTypes[IDX_INVALIDATE_REF_FRAMES], [&](session_t *session, const std::string_view &payload) { + std::int64_t *frames = (std::int64_t *)payload.data(); auto firstFrame = frames[0]; auto lastFrame = frames[1]; @@ -521,49 +331,47 @@ void controlThread(video::idr_event_t idr_events) { << "firstFrame [" << firstFrame << ']' << std::endl << "lastFrame [" << lastFrame << ']'; - idr_events->raise(std::make_pair(firstFrame, lastFrame)); + session->idr_events->raise(std::make_pair(firstFrame, lastFrame)); }); - server.map(packetTypes[IDX_INPUT_DATA], [](const std::string_view &payload) mutable { - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; + server.map(packetTypes[IDX_INPUT_DATA], [&](session_t *session, const std::string_view &payload) { + session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; BOOST_LOG(debug) << "type [IDX_INPUT_DATA]"sv; int32_t tagged_cipher_length = util::endian::big(*(int32_t*)payload.data()); std::string_view tagged_cipher { payload.data() + sizeof(tagged_cipher_length), (size_t)tagged_cipher_length }; - crypto::cipher_t cipher { session.gcm_key }; + crypto::cipher_t cipher { session->gcm_key }; cipher.padding = false; std::vector plaintext; - if(cipher.decrypt_gcm(session.iv, tagged_cipher, plaintext)) { + if(cipher.decrypt_gcm(session->iv, tagged_cipher, plaintext)) { // something went wrong :( BOOST_LOG(error) << "Failed to verify tag"sv; - stop(session); + stop(*session); } - if(tagged_cipher_length >= 16 + session.iv.size()) { - std::copy(payload.end() - 16, payload.end(), std::begin(session.iv)); + if(tagged_cipher_length >= 16 + session->iv.size()) { + std::copy(payload.end() - 16, payload.end(), std::begin(session->iv)); } input::print(plaintext.data()); input::passthrough(input, std::move(plaintext)); }); - while(session_state == state_e::STARTING) { - std::this_thread::sleep_for(1ms); - } - - while(session_state == state_e::RUNNING) { - if(std::chrono::steady_clock::now() > session.pingTimeout) { - BOOST_LOG(debug) << "Ping timeout"sv; - - stop(session); + while(!shutdown_event->peek()) { + auto now = std::chrono::steady_clock::now(); + for(auto &[addr,session] : server._map_addr_session) { + if(now > session->pingTimeout) { + BOOST_LOG(info) << addr << ": Ping Timeout"sv; + stop(*session); + } } - if(session.has_process && proc::proc.running() == -1) { + if(proc::proc.running() == -1) { BOOST_LOG(debug) << "Process terminated"sv; std::uint16_t reason = 0x0100; @@ -573,140 +381,88 @@ void controlThread(video::idr_event_t idr_events) { payload[1] = reason; server.send(std::string_view {(char*)payload.data(), payload.size()}); - - stop(session); } server.iterate(500ms); } } -template -util::Either asio_read(Stream &s, asio::io_service &io, const BufferSequence &bufs, Peer &peer, const asio::deadline_timer::duration_type& expire_time) { - std::optional timer_result, read_result; +void recvThread(broadcast_ctx_t &ctx) { + std::map peer_to_video_session; + std::map peer_to_audio_session; + std::map peer_to_control_session; - asio::deadline_timer timer { io }; + auto &video_sock = ctx.video_sock; + auto &audio_sock = ctx.audio_sock; - timer.expires_from_now(boost::posix_time::milliseconds(config::stream.ping_timeout.count())); - timer.async_wait([&](sys::error_code c){ - timer_result = c; - }); - - std::size_t len = 0; - s.async_receive_from(bufs, peer, 0, [&](const boost::system::error_code &ec, size_t bytes) { - len = bytes; - - read_result = ec; - }); - - io.reset(); - - while(io.run_one()) { - if(read_result) { - timer.cancel(); - } - else if(timer_result) { - s.cancel(); - } - } - - if(*read_result) { - return *read_result; - } - - return len; -} - -template -std::optional recv_peer(std::shared_ptr> &queue, udp::socket &sock, asio::io_service &io) { - std::array buf; - - char ping[] = { - 0x50, 0x49, 0x4E, 0x47 - }; + auto &session_queue = ctx.session_queue; + auto &io = ctx.io; udp::endpoint peer; - while (queue->running()) { - auto len_or_err = asio_read(sock, io, asio::buffer(buf), peer, boost::posix_time::milliseconds(config::stream.ping_timeout.count())); + std::array buf[2]; - if(len_or_err.has_right() || len_or_err.left() == 0) { - return std::nullopt; + auto recv_func_factory = [&](udp::socket &sock, int buf_elem, std::map &peer_to_session) { + std::function recv_func = [&](const boost::system::error_code &ec, size_t bytes) { + if(ec || !bytes) { + BOOST_LOG(fatal) << "Couldn't receive data from udp socket: "sv << ec.message(); + + log_flush(); + std::abort(); + } + + auto it = peer_to_session.find(peer.address()); + if(it != std::end(peer_to_session)) { + it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes }); + } + + sock.async_receive_from(asio::buffer(buf[buf_elem]), peer, 0, recv_func); + }; + + return recv_func; + }; + + video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func_factory(video_sock, 0, peer_to_video_session)); + audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func_factory(audio_sock, 1, peer_to_audio_session)); + + while(!ctx.shutdown_event.peek()) { + while(session_queue->peek()) { + auto message_queue_opt = session_queue->pop(); + TUPLE_3D_REF(socket_type, addr, message_queue, *message_queue_opt); + + switch(socket_type) { + case socket_e::video: + if(message_queue) { + peer_to_video_session.emplace(addr, message_queue); + } + else { + peer_to_video_session.erase(addr); + } + break; + case socket_e::audio: + if(message_queue) { + peer_to_audio_session.emplace(addr, message_queue); + } + else { + peer_to_audio_session.erase(addr); + } + break; + } } - auto len = len_or_err.left(); - if (len == 4 && !std::memcmp(ping, buf.data(), sizeof(ping))) { - BOOST_LOG(debug) << "PING from ["sv << peer.address().to_string() << ':' << peer.port() << ']'; - - return std::make_optional(std::move(peer)); - } - - BOOST_LOG(warning) << "Unknown transmission: "sv << util::hex_vec(std::string_view{buf.data(), len}); + io.run_one(); } - - return std::nullopt; } -void audioThread() { - while(session_state == state_e::STARTING) { - std::this_thread::sleep_for(1ms); - } - - auto &config = session.config; - - asio::io_service io; - udp::socket sock{io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT)}; - - auto peer = recv_peer(session.audio_packets, sock, io); - if(!peer) { - return; - } - - auto &packets = session.audio_packets; - std::thread captureThread{audio::capture, packets, config.audio}; - - uint16_t frame{1}; - - while (auto packet = packets->pop()) { - audio_packet_t audio_packet { (audio_packet_raw_t*)malloc(sizeof(audio_packet_raw_t) + packet->size()) }; - - audio_packet->rtp.header = 0; - audio_packet->rtp.packetType = 97; - audio_packet->rtp.sequenceNumber = util::endian::big(frame++); - audio_packet->rtp.timestamp = 0; - audio_packet->rtp.ssrc = 0; - - std::copy(std::begin(*packet), std::end(*packet), audio_packet->payload()); - - sock.send_to(asio::buffer((char*)audio_packet.get(), sizeof(audio_packet_raw_t) + packet->size()), *peer); - BOOST_LOG(verbose) << "Audio ["sv << frame - 1 << "] :: send..."sv; - } - - stop(session); - captureThread.join(); -} - -void videoThread(video::idr_event_t idr_events) { - while(session_state == state_e::STARTING) { - std::this_thread::sleep_for(1ms); - } - - auto &config = session.config; - +void videoBroadcastThread(safe::event_t *shutdown_event, udp::socket &sock, video::packet_queue_t packets) { int lowseq = 0; - - asio::io_service io; - udp::socket sock{io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT)}; + while(auto packet = packets->pop()) { + if(shutdown_event->peek()) { + break; + } - auto peer = recv_peer(session.video_packets, sock, io); - if(!peer) { - return; - } + auto session = (session_t*)packet->channel_data; - auto &packets = session.video_packets; - std::thread captureThread{video::capture_display, packets, idr_events, config.monitor}; - - while (auto packet = packets->pop()) { std::string_view payload{(char *) packet->data, (size_t) packet->size}; std::vector payload_new; @@ -719,32 +475,27 @@ void videoThread(video::idr_event_t idr_events) { // make sure moonlight recognizes the nalu code for IDR frames if (packet->flags & AV_PKT_FLAG_KEY) { // TODO: Not all encoders encode their IDR frames with the 4 byte NALU prefix - if(config.monitor.videoFormat == 0) { - auto h264_i_frame_old = "\000\000\001e"sv; - auto h264_i_frame = "\000\000\000\001e"sv; - assert(std::search(std::begin(payload), std::end(payload), std::begin(h264_i_frame), std::end(h264_i_frame)) == - std::end(payload)); - payload_new = replace(payload, h264_i_frame_old, h264_i_frame); - } - else { - auto hevc_i_frame_old = "\000\000\001("sv; - auto hevc_i_frame = "\000\000\000\001("sv; - assert(std::search(std::begin(payload), std::end(payload), std::begin(hevc_i_frame), std::end(hevc_i_frame)) == - std::end(payload)); - payload_new = replace(payload, hevc_i_frame_old, hevc_i_frame); + std::string_view frame_old = "\000\000\001e"sv; + std::string_view frame_new = "\000\000\000\001e"sv; + if(session->config.monitor.videoFormat != 0) { + frame_old = "\000\000\001("sv; + frame_new = "\000\000\000\001("sv; } + assert(std::search(std::begin(payload), std::end(payload), std::begin(hevc_i_frame), std::end(hevc_i_frame)) == + std::end(payload)); + payload_new = replace(payload, frame_old, frame_new); payload = {(char *) payload_new.data(), payload_new.size()}; } // insert packet headers - auto blocksize = config.packetsize + MAX_RTP_HEADER_SIZE; + auto blocksize = session->config.packetsize + MAX_RTP_HEADER_SIZE; auto payload_blocksize = blocksize - sizeof(video_packet_raw_t); auto fecPercentage = config::stream.fec_percentage; payload_new = insert(sizeof(video_packet_raw_t), payload_blocksize, - payload, [&](void *p, int fecIndex, int end) { + payload, [&](void *p, int fecIndex, int end) { video_packet_raw_t *video_packet = (video_packet_raw_t *)p; video_packet->packet.flags = FLAG_CONTAINS_PIC_DATA; @@ -754,7 +505,7 @@ void videoThread(video::idr_event_t idr_events) { fecIndex << 12 | end << 22 | fecPercentage << 4 - ); + ); if(fecIndex == 0) { video_packet->packet.flags |= FLAG_SOF; @@ -789,7 +540,7 @@ void videoThread(video::idr_event_t idr_events) { } for (auto x = 0; x < shards.size(); ++x) { - sock.send_to(asio::buffer(shards[x]), *peer); + sock.send_to(asio::buffer(shards[x]), session->video_peer); } if(packet->flags & AV_PKT_FLAG_KEY) { @@ -802,326 +553,147 @@ void videoThread(video::idr_event_t idr_events) { lowseq += shards.size(); } - stop(session); - captureThread.join(); + shutdown_event->raise(true); } -void respond(host_t &host, peer_t peer, msg_t &resp) { - auto payload = std::make_pair(resp->payload, resp->payloadLength); +void audioBroadcastThread(safe::event_t *shutdown_event, udp::socket &sock, audio::packet_queue_t packets) { + uint16_t frame{1}; - auto lg = util::fail_guard([&]() { - resp->payload = payload.first; - resp->payloadLength = payload.second; - }); - - resp->payload = nullptr; - resp->payloadLength = 0; - - int serialized_len; - util::c_ptr raw_resp { serializeRtspMessage(resp.get(), &serialized_len) }; - BOOST_LOG(debug) - << "---Begin Response---"sv << std::endl - << std::string_view { raw_resp.get(), (std::size_t)serialized_len } << std::endl - << std::string_view { payload.first, (std::size_t)payload.second } << std::endl - << "---End Response---"sv << std::endl; - - std::string_view tmp_resp { raw_resp.get(), (size_t)serialized_len }; - - { - auto packet = enet_packet_create(tmp_resp.data(), tmp_resp.size(), ENET_PACKET_FLAG_RELIABLE); - if(enet_peer_send(peer, 0, packet)) { - enet_packet_destroy(packet); - return; + while (auto packet = packets->pop()) { + if(shutdown_event->peek()) { + break; } - enet_host_flush(host.get()); + TUPLE_2D_REF(session, packet_data, *packet); + audio_packet_t audio_packet { (audio_packet_raw_t*)malloc(sizeof(audio_packet_raw_t) + packet_data.size()) }; + + audio_packet->rtp.header = 0; + audio_packet->rtp.packetType = 97; + audio_packet->rtp.sequenceNumber = util::endian::big(frame++); + audio_packet->rtp.timestamp = 0; + audio_packet->rtp.ssrc = 0; + + std::copy(std::begin(packet_data), std::end(packet_data), audio_packet->payload()); + + sock.send_to(asio::buffer((char*)audio_packet.get(), sizeof(audio_packet_raw_t) + packet_data.size()), ((session_t*)session)->audio_peer); + BOOST_LOG(verbose) << "Audio ["sv << frame - 1 << "] :: send..."sv; } - if(payload.second > 0) { - auto packet = enet_packet_create(payload.first, payload.second, ENET_PACKET_FLAG_RELIABLE);; - if(enet_peer_send(peer, 0, packet)) { - enet_packet_destroy(packet); - return; - } - - enet_host_flush(host.get()); - } + shutdown_event->raise(true); } -void respond(host_t &host, peer_t peer, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { - msg_t resp { new msg_t::element_type }; - createRtspResponse(resp.get(), nullptr, 0, const_cast("RTSP/1.0"), statuscode, const_cast(status_msg), seqn, options, const_cast(payload.data()), (int)payload.size()); +int start_broadcast(broadcast_ctx_t &ctx) { + ctx.video_packets = std::make_shared(); + ctx.audio_packets = std::make_shared(); - respond(host, peer, resp); + ctx.video_thread = std::thread { videoBroadcastThread, &ctx.shutdown_event, std::ref(ctx.video_sock), ctx.video_packets }; + ctx.audio_thread = std::thread { audioBroadcastThread, &ctx.shutdown_event, std::ref(ctx.audio_sock), ctx.audio_packets }; + ctx.control_thread = std::thread { controlBroadcastThread, &ctx.shutdown_event }; + + ctx.recv_thread = std::thread { recvThread, std::ref(ctx) }; + + return 0; } -void cmd_not_found(host_t &host, peer_t peer, msg_t&& req) { - respond(host, peer, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); +void end_broadcast(broadcast_ctx_t &ctx) { + ctx.shutdown_event.raise(true); + ctx.video_packets->stop(); + ctx.audio_packets->stop(); + + ctx.session_queue->stop(); + + ctx.video_sock.cancel(); + ctx.audio_sock.cancel(); + + BOOST_LOG(debug) << "Waiting for video thread to end..."sv; + ctx.video_thread.join(); + BOOST_LOG(debug) << "Waiting for audio thread to end..."sv; + ctx.audio_thread.join(); + BOOST_LOG(debug) << "Waiting for control thread to end..."sv; + ctx.control_thread.join(); + BOOST_LOG(debug) << "All broadcasting threads ended"sv; + + ctx.video_packets.reset(); + ctx.audio_packets.reset(); } -void cmd_option(host_t &host, peer_t peer, msg_t&& req) { - OPTION_ITEM option {}; - - // I know these string literals will not be modified - option.option = const_cast("CSeq"); - - auto seqn_str = std::to_string(req->sequenceNumber); - option.content = const_cast(seqn_str.c_str()); - - respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); -} - -void cmd_describe(host_t &host, peer_t peer, msg_t&& req) { - OPTION_ITEM option {}; - - // I know these string literals will not be modified - option.option = const_cast("CSeq"); - - auto seqn_str = std::to_string(req->sequenceNumber); - option.content = const_cast(seqn_str.c_str()); - - std::string_view payload; - if(config::video.hevc_mode == 0) { - payload = "surround-params=NONE"sv; - } - else { - payload = "sprop-parameter-sets=AAAAAU;surround-params=NONE"sv; - } - - respond(host, peer, &option, 200, "OK", req->sequenceNumber, payload); -} - -void cmd_setup(host_t &host, peer_t peer, msg_t &&req) { - OPTION_ITEM options[2] {}; - - auto &seqn = options[0]; - auto &session_option = options[1]; - - seqn.option = const_cast("CSeq"); - - auto seqn_str = std::to_string(req->sequenceNumber); - seqn.content = const_cast(seqn_str.c_str()); - - if(session.video_packets) { - // already streaming - - respond(host, peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {}); - return; - } - - std::string_view target { req->message.request.target }; - auto begin = std::find(std::begin(target), std::end(target), '=') + 1; - auto end = std::find(begin, std::end(target), '/'); - std::string_view type { begin, (size_t)std::distance(begin, end) }; - - if(type == "audio"sv) { - seqn.next = &session_option; - - session_option.option = const_cast("Session"); - session_option.content = const_cast("DEADBEEFCAFE;timeout = 90"); - } - else if(type != "video"sv && type != "control"sv) { - cmd_not_found(host, peer, std::move(req)); - - return; - } - - respond(host, peer, &seqn, 200, "OK", req->sequenceNumber, {}); -} - -void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { - OPTION_ITEM option {}; - - // I know these string literals will not be modified - option.option = const_cast("CSeq"); - - auto seqn_str = std::to_string(req->sequenceNumber); - option.content = const_cast(seqn_str.c_str()); - - auto expected_state = state_e::STOPPED; - auto abort = !session_state.compare_exchange_strong(expected_state, state_e::STARTING); - - if(abort || !launch_event.peek()) { - //Either already streaming or /launch has not been used - - if(!abort) { - session_state.store(state_e::STOPPED); - } - - respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); - return; - } - auto launch_session { launch_event.pop() }; - - std::string_view payload { req->payload, (size_t)req->payloadLength }; - - std::vector lines; - - auto whitespace = [](char ch) { - return ch == '\n' || ch == '\r'; +int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) { + constexpr char ping[] = { + 0x50, 0x49, 0x4E, 0x47 }; - { - auto pos = std::begin(payload); - auto begin = pos; - while (pos != std::end(payload)) { - if (whitespace(*pos++)) { - lines.emplace_back(begin, pos - begin - 1); + auto messages = std::make_shared(); + ref->session_queue->raise(std::make_tuple(type, addr, messages)); - while(pos != std::end(payload) && whitespace(*pos)) { ++pos; } - begin = pos; - } - } - } - - std::string_view client; - std::unordered_map args; - - for(auto line : lines) { - auto type = line.substr(0, 2); - if(type == "s="sv) { - client = line.substr(2); - } - else if(type == "a=") { - auto pos = line.find(':'); - - auto name = line.substr(2, pos - 2); - auto val = line.substr(pos + 1); - - if(val[val.size() -1] == ' ') { - val = val.substr(0, val.size() -1); - } - args.emplace(name, val); - } - } - - // Initialize any omitted parameters to defaults - args.try_emplace("x-nv-video[0].encoderCscMode"sv, "0"sv); - args.try_emplace("x-nv-vqos[0].bitStreamFormat"sv, "0"sv); - args.try_emplace("x-nv-video[0].dynamicRangeMode"sv, "0"sv); - args.try_emplace("x-nv-aqos.packetDuration"sv, "5"sv); - - try { - - auto &config = session.config; - config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv)); - config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv)); - config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv)); - - config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv)); - - config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv)); - config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv)); - config.monitor.framerate = util::from_view(args.at("x-nv-video[0].maxFPS"sv)); - config.monitor.bitrate = util::from_view(args.at("x-nv-vqos[0].bw.maximumBitrateKbps"sv)); - config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv)); - config.monitor.numRefFrames = util::from_view(args.at("x-nv-video[0].maxNumReferenceFrames"sv)); - config.monitor.encoderCscMode = util::from_view(args.at("x-nv-video[0].encoderCscMode"sv)); - config.monitor.videoFormat = util::from_view(args.at("x-nv-vqos[0].bitStreamFormat"sv)); - config.monitor.dynamicRange = util::from_view(args.at("x-nv-video[0].dynamicRangeMode"sv)); - - } catch(std::out_of_range &) { - - respond(host, peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); - return; - } - - if(session.config.monitor.videoFormat != 0 && config::video.hevc_mode == 0) { - BOOST_LOG(error) << "HEVC is disabled, yet the client requested HEVC"sv; - - respond(host, peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); - return; - } - - - auto &gcm_key = launch_session->gcm_key; - auto &iv = launch_session->iv; - - std::copy(std::begin(gcm_key), std::end(gcm_key), std::begin(session.gcm_key)); - std::copy(std::begin(iv), std::end(iv), std::begin(session.iv)); - - session.has_process = launch_session->has_process; - - session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - - session.video_packets = std::make_shared(); - session.audio_packets = std::make_shared(); - - video::idr_event_t idr_events {new video::idr_event_t::element_type }; - - session.audioThread = std::thread {audioThread}; - session.videoThread = std::thread {videoThread, idr_events}; - session.controlThread = std::thread {controlThread, idr_events}; - - session_state.store(state_e::RUNNING); - respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); -} - -void cmd_play(host_t &host, peer_t peer, msg_t &&req) { - OPTION_ITEM option {}; - - // I know these string literals will not be modified - option.option = const_cast("CSeq"); - - auto seqn_str = std::to_string(req->sequenceNumber); - option.content = const_cast(seqn_str.c_str()); - - respond(host, peer, &option, 200, "OK", req->sequenceNumber, {}); -} - -void rtpThread(std::shared_ptr> shutdown_event) { - input = std::make_shared(); auto fg = util::fail_guard([&]() { - input.reset(); + // remove message queue from session + ref->session_queue->raise(std::make_tuple(type, addr, nullptr)); }); - rtsp_server_t server(RTSP_SETUP_PORT); + auto msg_opt = messages->pop(config::stream.ping_timeout); + messages->stop(); - server.map("OPTIONS"sv, &cmd_option); - server.map("DESCRIBE"sv, &cmd_describe); - server.map("SETUP"sv, &cmd_setup); - server.map("ANNOUNCE"sv, &cmd_announce); + if(!msg_opt) { + BOOST_LOG(error) << "Ping Timeout"sv; - server.map("PLAY"sv, &cmd_play); - - while(!shutdown_event->peek()) { - server.iterate(std::min(500ms, config::stream.ping_timeout)); - - if(session_state == state_e::STOPPING) { - BOOST_LOG(debug) << "Waiting for Audio to end..."sv; - session.audioThread.join(); - BOOST_LOG(debug) << "Waiting for Video to end..."sv; - session.videoThread.join(); - BOOST_LOG(debug) << "Waiting for Control to end..."sv; - session.controlThread.join(); - - BOOST_LOG(debug) << "Resetting Session..."sv << std::endl; - session.video_packets = video::packet_queue_t(); - session.audio_packets = audio::packet_queue_t(); - - input::reset(input); - - session_state.store(state_e::STOPPED); - } + return -1; } - if(session_state != state_e::STOPPED) { - // Wait until the session is properly running - while(session_state == state_e::STARTING) { - std::this_thread::sleep_for(1ms); - } + TUPLE_2D_REF(port, msg, *msg_opt); + if(msg != ping) { + BOOST_LOG(error) << "First message is not a PING"sv; - stop(session); - - BOOST_LOG(debug) << "Waiting for Audio to end..."sv; - session.audioThread.join(); - BOOST_LOG(debug) << "Waiting for Video to end..."sv; - session.videoThread.join(); - BOOST_LOG(debug) << "Waiting for Control to end..."sv; - session.controlThread.join(); - - input::reset(input); + return -1; } + + return port; } +void videoThread(std::shared_ptr session, std::string addr_str) { + auto fg = util::fail_guard([&]() { + stop(*session); + }); + + while(session->state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + + auto addr = asio::ip::make_address(addr_str); + + auto ref = broadcast.ref(); + auto port = recv_ping(ref, socket_e::video, addr, config::stream.ping_timeout); + if(port < 0) { + return; + } + + auto &idr_events = session->idr_events; + + session->video_peer.address(addr); + session->video_peer.port(port); + + video::capture(ref->video_packets, idr_events, session->config.monitor, session.get()); +} + +void audioThread(std::shared_ptr session, std::string addr_str) { + auto fg = util::fail_guard([&]() { + stop(*session); + }); + + while(session->state == state_e::STARTING) { + std::this_thread::sleep_for(1ms); + } + + auto addr = asio::ip::make_address(addr_str); + + auto ref = broadcast.ref(); + auto port = recv_ping(ref, socket_e::audio, addr, config::stream.ping_timeout); + if(port < 0) { + return; + } + + session->audio_peer.address(addr); + session->audio_peer.port(port); + + audio::capture(ref->audio_packets, session->config.audio, session.get()); +} } diff --git a/sunshine/stream.h b/sunshine/stream.h index 74da40a2..0bc49409 100644 --- a/sunshine/stream.h +++ b/sunshine/stream.h @@ -5,12 +5,32 @@ #ifndef SUNSHINE_STREAM_H #define SUNSHINE_STREAM_H -#include +#include -#include "crypto.h" #include "thread_safe.h" +#include "video.h" +#include "audio.h" +#include "crypto.h" + +namespace input { +struct input_t; +} namespace stream { +constexpr auto VIDEO_STREAM_PORT = 47998; +constexpr auto CONTROL_PORT = 47999; +constexpr auto AUDIO_STREAM_PORT = 48000; + +namespace asio = boost::asio; +namespace sys = boost::system; + +using asio::ip::tcp; +using asio::ip::udp; + +enum class socket_e : int { + video, + audio +}; enum class state_e : int { STOPPED, @@ -19,18 +39,64 @@ enum class state_e : int { RUNNING, }; -struct launch_session_t { +using message_queue_t = std::shared_ptr>>; +using message_queue_queue_t = std::shared_ptr>>; + +struct config_t { + audio::config_t audio; + video::config_t monitor; + int packetsize; + + bool sops; + std::optional gcmap; +}; + +struct broadcast_ctx_t { + safe::event_t shutdown_event; + + video::packet_queue_t video_packets; + audio::packet_queue_t audio_packets; + + message_queue_queue_t session_queue; + + std::thread video_thread; + std::thread audio_thread; + std::thread control_thread; + + std::thread recv_thread; + + asio::io_service io; + udp::socket video_sock { io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT) }; + udp::socket audio_sock { io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT) }; +}; + +struct session_t { + config_t config; + + std::thread audioThread; + std::thread videoThread; + + std::chrono::steady_clock::time_point pingTimeout; + + udp::endpoint video_peer; + udp::endpoint audio_peer; + + + + video::idr_event_t idr_events; + crypto::aes_t gcm_key; crypto::aes_t iv; - bool has_process; + std::atomic state; }; -extern safe::event_t launch_event; -extern std::atomic session_state; +void videoThread(std::shared_ptr session, std::string addr_str); +void audioThread(std::shared_ptr session, std::string addr_str); -void rtpThread(std::shared_ptr> shutdown_event); +void stop(session_t &session); +extern std::shared_ptr input; } #endif //SUNSHINE_STREAM_H diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index 24ceee01..41a16714 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "utility.h" @@ -50,6 +51,26 @@ public: return val; } + // pop and view shoud not be used interchangebly + template + status_t pop(std::chrono::duration delay) { + std::unique_lock ul{_lock}; + + if (!_continue) { + return util::false_v; + } + + while (!_status) { + if (!_continue || _cv.wait_for(ul, delay) == std::cv_status::timeout) { + return util::false_v; + } + } + + auto val = std::move(_status); + _status = util::false_v; + return val; + } + // pop and view shoud not be used interchangebly const status_t &view() { std::unique_lock ul{_lock}; @@ -119,6 +140,26 @@ public: return !_queue.empty(); } + template + status_t pop(std::chrono::duration delay) { + std::unique_lock ul{_lock}; + + if (!_continue) { + return util::false_v; + } + + while (_queue.empty()) { + if (!_continue || _cv.wait_for(ul, delay) == std::cv_status::timeout) { + return util::false_v; + } + } + + auto val = std::move(_queue.front()); + _queue.erase(std::begin(_queue)); + + return val; + } + status_t pop() { std::unique_lock ul{_lock}; @@ -165,6 +206,108 @@ private: std::vector _queue; }; +template +class shared_t { +public: + using element_type = T; + + using construct_f = std::function; + using destruct_f = std::function; + + struct ptr_t { + shared_t *owner; + + explicit ptr_t(shared_t *owner) : owner { owner } {} + + ptr_t(ptr_t &&ptr) noexcept { + owner = ptr.owner; + + ptr.owner = nullptr; + } + + ptr_t(const ptr_t &ptr) noexcept { + auto tmp = ptr.owner->ref(); + + owner = tmp.owner; + tmp.owner = nullptr; + } + + ptr_t &operator=(const ptr_t &ptr) noexcept { + return *this = std::move(*ptr.owner->ref()); + } + + ptr_t &operator=(ptr_t &&ptr) noexcept { + if(owner) { + release(); + } + + std::swap(owner, ptr.owner); + + return *this; + } + + ~ptr_t() { + if(owner) { + release(); + } + } + + operator bool () const { + return owner != nullptr; + } + + void release() { + std::lock_guard lg { owner->_lock }; + auto c = owner->_count.fetch_sub(1, std::memory_order_acquire); + + if(c - 1 == 0) { + owner->_destruct(*get()); + (*this)->~element_type(); + } + + owner = nullptr; + } + + element_type *get() const { + return reinterpret_cast(owner->_object_buf.data()); + } + + element_type *operator->() { + return reinterpret_cast(owner->_object_buf.data()); + } + }; + + template + shared_t(FC && fc, FD &&fd) : _construct { std::forward(fc) }, _destruct { std::forward(fd) } {} + [[nodiscard]] ptr_t ref() { + auto c = _count.fetch_add(1, std::memory_order_acquire); + if(!c) { + std::lock_guard lg { _lock }; + + new(_object_buf.data()) element_type; + if(_construct(*reinterpret_cast(_object_buf.data()))) { + return ptr_t { nullptr }; + } + } + + return ptr_t { this }; + } +private: + construct_f _construct; + destruct_f _destruct; + + std::array _object_buf; + + std::atomic _count; + std::mutex _lock; +}; + +template +auto make_shared(F_Construct &&fc, F_Destruct &&fd) { + return shared_t { + std::forward(fc), std::forward(fd) + }; +} } #endif //SUNSHINE_THREAD_SAFE_H diff --git a/sunshine/video.cpp b/sunshine/video.cpp index d83b6e6f..a9815d86 100644 --- a/sunshine/video.cpp +++ b/sunshine/video.cpp @@ -2,14 +2,15 @@ // Created by loki on 6/6/19. // +#include #include extern "C" { -#include #include } #include "platform/common.h" +#include "thread_pool.h" #include "config.h" #include "video.h" #include "main.h" @@ -32,9 +33,20 @@ void free_packet(AVPacket *packet) { using ctx_t = util::safe_ptr; using frame_t = util::safe_ptr; using sws_t = util::safe_ptr; -using img_event_t = std::shared_ptr>>; +using img_event_t = std::shared_ptr>>; -auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) { +struct capture_ctx_t { + img_event_t images; + std::chrono::nanoseconds delay; + std::chrono::steady_clock::time_point next_frame; +}; + +struct capture_thread_ctx_t { + std::shared_ptr> capture_ctx_queue; + std::thread capture_thread; +}; + +[[nodiscard]] auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) { avcodec_open2(ctx.get(), codec, options); return util::fail_guard([&]() { @@ -42,7 +54,87 @@ auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) { }); } -void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::img_t &img, packet_queue_t &packets) { +int capture_display(platf::img_t *img, std::unique_ptr &disp) { + auto status = disp->snapshot(img, display_cursor); + switch (status) { + case platf::capture_e::reinit: { + // We try this twice, in case we still get an error on reinitialization + for(int x = 0; x < 2; ++x) { + disp.reset(); + disp = platf::display(); + + if(disp) { + break; + } + + std::this_thread::sleep_for(200ms); + } + + if(!disp) { + return -1; + } + + return -1; + } + case platf::capture_e::error: + return -1; + // Prevent warning during compilation + case platf::capture_e::timeout: + case platf::capture_e::ok: + return 0; + default: + BOOST_LOG(error) << "Unrecognized capture status ["sv << (int)status << ']'; + return -1; + } +} + +void captureThread(std::shared_ptr> capture_ctx_queue) { + std::vector capture_ctxs; + + auto fg = util::fail_guard([&]() { + capture_ctx_queue->stop(); + + // Stop all sessions listening to this thread + for(auto &capture_ctx : capture_ctxs) { + capture_ctx.images->stop(); + } + for(auto &capture_ctx : capture_ctx_queue->unsafe()) { + capture_ctx.images->stop(); + } + + }); + + auto disp = platf::display(); + while(capture_ctx_queue->running()) { + while(capture_ctx_queue->peek()) { + capture_ctxs.emplace_back(std::move(*capture_ctx_queue->pop())); + } + + std::shared_ptr img = disp->alloc_img(); + auto has_error = capture_display(img.get(), disp); + if(has_error) { + return; + } + + auto time_point = std::chrono::steady_clock::now(); + for(auto capture_ctx = std::begin(capture_ctxs); capture_ctx != std::end(capture_ctxs); ++capture_ctx) { + if(!capture_ctx->images->running()) { + capture_ctx = capture_ctxs.erase(capture_ctx); + + continue; + } + + if(time_point > capture_ctx->next_frame) { + continue; + } + + capture_ctx->images->raise(img); + capture_ctx->next_frame = time_point + capture_ctx->delay; + } + } +} + +void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::img_t &img, packet_queue_t &packets, void *channel_data) { av_frame_make_writable(yuv_frame.get()); const int linesizes[2] { @@ -67,7 +159,7 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im } while (ret >= 0) { - packet_t packet { av_packet_alloc() }; + auto packet = std::make_unique(nullptr); ret = avcodec_receive_packet(ctx.get(), packet.get()); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { @@ -79,17 +171,51 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im std::abort(); } + packet->channel_data = channel_data; packets->raise(std::move(packet)); } } -void encodeThread( - img_event_t images, +int start_capture(capture_thread_ctx_t &capture_thread_ctx) { + capture_thread_ctx.capture_ctx_queue = std::make_shared>(); + + capture_thread_ctx.capture_thread = std::thread { + captureThread, capture_thread_ctx.capture_ctx_queue + }; + + return 0; +} +void end_capture(capture_thread_ctx_t &capture_thread_ctx) { + capture_thread_ctx.capture_ctx_queue->stop(); + + capture_thread_ctx.capture_thread.join(); +} + +void capture( packet_queue_t packets, idr_event_t idr_events, - config_t config) { + config_t config, + void *channel_data) { + int framerate = config.framerate; + auto images = std::make_shared(); + // Keep a reference counter to ensure the capture thread only runs when other threads have a reference to the capture thread + static auto capture_thread = safe::make_shared(start_capture, end_capture); + auto ref = capture_thread.ref(); + if(!ref) { + return; + } + + ref->capture_ctx_queue->raise(capture_ctx_t { + images, std::chrono::floor(1s) / framerate, std::chrono::steady_clock::now() + }); + + if(!ref->capture_ctx_queue->running()) { + return; + } + + AVCodec *codec; if(config.videoFormat == 0) { @@ -217,7 +343,11 @@ void encodeThread( // Initiate scaling context with correct height and width sws_t sws; - while (auto img = images->pop()) { + while(auto img = images->pop()) { + if(!idr_events->running()) { + break; + } + auto new_width = img->width; auto new_height = img->height; @@ -250,68 +380,11 @@ void encodeThread( yuv_frame->pict_type = AV_PICTURE_TYPE_I; } - encode(frame++, ctx, sws, yuv_frame, *img, packets); + encode(frame++, ctx, sws, yuv_frame, *img, packets, channel_data); yuv_frame->pict_type = AV_PICTURE_TYPE_NONE; } -} - -void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config) { - display_cursor = true; - - int framerate = config.framerate; - - auto disp = platf::display(); - if(!disp) { - packets->stop(); - return; - } - - img_event_t images {new img_event_t::element_type }; - std::thread encoderThread { &encodeThread, images, packets, idr_events, config }; - - auto time_span = std::chrono::floor(1s) / framerate; - while(packets->running()) { - auto next_snapshot = std::chrono::steady_clock::now() + time_span; - - auto img = disp->alloc_img(); - auto status = disp->snapshot(img.get(), display_cursor); - - switch(status) { - case platf::capture_e::reinit: { - // We try this twice, in case we still get an error on reinitialization - for(int x = 0; x < 2; ++x) { - disp.reset(); - disp = platf::display(); - - if (disp) { - break; - } - - std::this_thread::sleep_for(200ms); - } - - if (!disp) { - packets->stop(); - } - continue; - } - case platf::capture_e::timeout: - std::this_thread::sleep_until(next_snapshot); - continue; - case platf::capture_e::error: - packets->stop(); - continue; - // Prevent warning during compilation - case platf::capture_e::ok: - break; - } - - images->raise(std::move(img)); - std::this_thread::sleep_until(next_snapshot); - } images->stop(); - encoderThread.join(); } } diff --git a/sunshine/video.h b/sunshine/video.h index 16025370..873b76bc 100644 --- a/sunshine/video.h +++ b/sunshine/video.h @@ -6,14 +6,37 @@ #define SUNSHINE_VIDEO_H #include "thread_safe.h" +#include "platform/common.h" + +extern "C" { +#include +} struct AVPacket; namespace video { void free_packet(AVPacket *packet); -using packet_t = util::safe_ptr; +struct packet_raw_t : public AVPacket { + template + explicit packet_raw_t(P *user_data) : channel_data { user_data } { + av_init_packet(this); + } + + explicit packet_raw_t(std::nullptr_t null) : channel_data { nullptr } { + av_init_packet(this); + } + + ~packet_raw_t() { + av_packet_unref(this); + } + + void *channel_data; +}; + +using packet_t = std::unique_ptr; using packet_queue_t = std::shared_ptr>; using idr_event_t = std::shared_ptr>>; +using img_event_t = std::shared_ptr>>; struct config_t { int width; @@ -27,7 +50,11 @@ struct config_t { int dynamicRange; }; -void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config); +void capture( + packet_queue_t packets, + idr_event_t idr_events, + config_t config, + void *channel_data); } #endif //SUNSHINE_VIDEO_H