diff --git a/sunshine/main.cpp b/sunshine/main.cpp index 59b49024..59e010e8 100644 --- a/sunshine/main.cpp +++ b/sunshine/main.cpp @@ -189,10 +189,12 @@ int main(int argc, char *argv[]) { task_pool.start(1); + bool shutdown_by_interrupt = false; + util::TaskPool::task_id_t force_shutdown = nullptr; // Create signal handler after logging has been initialized auto shutdown_event = mail::man->event(mail::shutdown); - on_signal(SIGINT, [&force_shutdown, shutdown_event]() { + on_signal(SIGINT, [&shutdown_by_interrupt, &force_shutdown, shutdown_event]() { BOOST_LOG(info) << "Interrupt handler called"sv; auto task = []() { @@ -202,7 +204,7 @@ int main(int argc, char *argv[]) { }; force_shutdown = task_pool.pushDelayed(task, 10s).task_id; - + shutdown_by_interrupt = true; shutdown_event->raise(true); }); @@ -219,7 +221,11 @@ int main(int argc, char *argv[]) { shutdown_event->raise(true); }); - auto exit_guard = util::fail_guard([&force_shutdown]() { + auto exit_guard = util::fail_guard([&shutdown_by_interrupt, &force_shutdown]() { + if(!shutdown_by_interrupt) { + return; + } + task_pool.cancel(force_shutdown); std::cout << "Sunshine exited: Press enter to continue"sv << std::endl; diff --git a/sunshine/nvhttp.cpp b/sunshine/nvhttp.cpp index 09362f0c..c1dd4183 100644 --- a/sunshine/nvhttp.cpp +++ b/sunshine/nvhttp.cpp @@ -33,7 +33,7 @@ using namespace std::literals; namespace nvhttp { -constexpr auto VERSION = "7.1.400.0"; +constexpr auto VERSION = "7.1.404.0"; constexpr auto GFE_VERSION = "3.12.0.1"; namespace fs = std::filesystem; @@ -675,7 +675,7 @@ void launch(bool &host_audio, resp_https_t response, req_https_t request) { stream::launch_session_raise(make_launch_session(host_audio, args)); tree.put("root..status_code", 200); - tree.put("root.sessionUrl0", "rtspru://"s + request->local_endpoint_address() + ':' + std::to_string(map_port(stream::RTSP_SETUP_PORT))); + tree.put("root.sessionUrl0", "rtsp://"s + request->local_endpoint_address() + ':' + std::to_string(map_port(stream::RTSP_SETUP_PORT))); tree.put("root.gamesession", 1); } diff --git a/sunshine/rtsp.cpp b/sunshine/rtsp.cpp index 85825973..ab86a1db 100644 --- a/sunshine/rtsp.cpp +++ b/sunshine/rtsp.cpp @@ -2,10 +2,17 @@ // Created by loki on 2/2/20. // +#define BOOST_BIND_GLOBAL_PLACEHOLDERS + extern "C" { #include } +#include + +#include +#include + #include "config.h" #include "input.h" #include "main.h" @@ -22,15 +29,6 @@ using asio::ip::udp; using namespace std::literals; namespace stream { - -//FIXME: Quick and dirty workaround for bug in MinGW 9.3 causing a linker error when using std::to_string -template -std::string to_string(T &&t) { - std::stringstream ss; - ss << std::forward(t); - return ss.str(); -} - void free_msg(PRTSP_MESSAGE msg) { freeMessage(msg); @@ -40,20 +38,108 @@ void free_msg(PRTSP_MESSAGE msg) { class rtsp_server_t; using msg_t = util::safe_ptr; -using cmd_func_t = std::function; +using cmd_func_t = std::function; void print_msg(PRTSP_MESSAGE msg); -void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t &&req); +void cmd_not_found(tcp::socket &sock, msg_t &&req); + +class socket_t : public std::enable_shared_from_this { +public: + socket_t(boost::asio::io_service &ios, std::function &&handle_data_fn) + : handle_data_fn { std::move(handle_data_fn) }, sock { ios } {} + + void read() { + sock.async_read_some( + boost::asio::buffer(msg_buf.data(), msg_buf.size()), + boost::bind( + &socket_t::handle_read, shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + static void handle_read(std::shared_ptr &socket, const boost::system::error_code &ec, std::size_t bytes) { + BOOST_LOG(debug) << "Handle read of size: "sv << bytes << " bytes"sv; + + if(ec) { + BOOST_LOG(error) << "RTSP: Couldn't read from tcp socket: "sv << ec.message(); + + boost::system::error_code ec; + socket->sock.close(ec); + + if(ec) { + BOOST_LOG(error) << "RTSP: Couldn't close tcp socket: "sv << ec.message(); + } + + return; + } + + auto fg = util::fail_guard([&socket]() { + socket->sock.close(); + }); + + msg_t req { new msg_t::element_type }; + + auto &incomplete = socket->incomplete; + if(incomplete.empty()) { + parseRtspMessage(req.get(), socket->msg_buf.data(), bytes); + + for(auto option = req->options; option != nullptr; option = option->next) { + if("Content-length"sv == option->option) { + BOOST_LOG(debug) << "Found Content-Length: "sv << option->content << " bytes"sv; + + // If content_length > bytes read, then we need to store current data read, + // to be appended by the next read. + auto content_length = util::from_view(option->content); + if(content_length <= bytes) { + break; + } + + auto incomplete_size = incomplete.size(); + incomplete.resize(incomplete.size() + bytes); + + std::copy_n(socket->msg_buf.data(), bytes, std::begin(incomplete) + incomplete_size); + + socket->read(); + + fg.disable(); + return; + } + } + } + else { + auto incomplete_size = incomplete.size(); + incomplete.resize(incomplete.size() + bytes); + + std::copy_n(socket->msg_buf.data(), bytes, std::begin(incomplete) + incomplete_size); + + parseRtspMessage(req.get(), incomplete.data(), incomplete.size()); + } + + print_msg(req.get()); + + socket->handle_data(std::move(req)); + } + + void handle_data(msg_t &&req) { + handle_data_fn(sock, std::move(req)); + } + + std::function handle_data_fn; + + tcp::socket sock; + + std::vector incomplete; + + std::array msg_buf; +}; class rtsp_server_t { public: ~rtsp_server_t() { - if(_host) { - clear(); - } + clear(); } - int bind(std::uint16_t port) { + int bind(std::uint16_t port, boost::system::error_code &ec) { { auto lg = _session_slots.lock(); @@ -61,9 +147,70 @@ public: _slot_count = config::stream.channels; } - _host = net::host_create(_addr, 1, port); + acceptor.open(tcp::v4(), ec); + if(ec) { + return -1; + } - return !(bool)_host; + acceptor.bind(tcp::endpoint(tcp::v4(), port), ec); + if(ec) { + return -1; + } + + acceptor.listen(4096, ec); + if(ec) { + return -1; + } + + next_socket = std::make_shared(ios, [this](tcp::socket &sock, msg_t &&msg) { + handle_msg(sock, std::move(msg)); + }); + + acceptor.async_accept(next_socket->sock, [this](const auto &ec) { + handle_accept(ec); + }); + + return 0; + } + + template + void iterate(std::chrono::duration timeout) { + ios.run_one_for(timeout); + } + + void handle_msg(tcp::socket &sock, msg_t &&req) { + auto func = _map_cmd_cb.find(req->message.request.command); + if(func != std::end(_map_cmd_cb)) { + func->second(this, sock, std::move(req)); + } + else { + cmd_not_found(sock, std::move(req)); + } + } + + void handle_accept(const boost::system::error_code &ec) { + if(ec) { + BOOST_LOG(error) << "Couldn't accept incoming connections: "sv << ec.message(); + + //Stop server + clear(); + return; + } + + auto socket = std::move(next_socket); + socket->read(); + + next_socket = std::make_shared(ios, [this](tcp::socket &sock, msg_t &&msg) { + handle_msg(sock, std::move(msg)); + }); + + acceptor.async_accept(next_socket->sock, [this](const auto &ec) { + handle_accept(ec); + }); + } + + void map(const std::string_view &type, cmd_func_t cb) { + _map_cmd_cb.emplace(type, std::move(cb)); } void session_raise(launch_session_t launch_session) { @@ -76,73 +223,7 @@ public: return config::stream.channels - _slot_count; } - template - void iterate(std::chrono::duration timeout) { - ENetEvent event; - auto res = enet_host_service(_host.get(), &event, std::chrono::floor(timeout).count()); - - if(res > 0) { - switch(event.type) { - case ENET_EVENT_TYPE_RECEIVE: { - net::packet_t packet { event.packet }; - net::peer_t peer { event.peer }; - - msg_t req { new msg_t::element_type }; - - //TODO: compare addresses of the peers - if(_queue_packet.second == nullptr) { - parseRtspMessage(req.get(), (char *)packet->data, packet->dataLength); - for(auto option = req->options; option != nullptr; option = option->next) { - if("Content-length"sv == option->option) { - _queue_packet = std::make_pair(peer, std::move(packet)); - return; - } - } - } - else { - std::vector full_payload; - - auto old_msg = std::move(_queue_packet); - auto &old_packet = old_msg.second; - - std::string_view new_payload { (char *)packet->data, packet->dataLength }; - std::string_view old_payload { (char *)old_packet->data, old_packet->dataLength }; - full_payload.resize(new_payload.size() + old_payload.size()); - - std::copy(std::begin(old_payload), std::end(old_payload), std::begin(full_payload)); - std::copy(std::begin(new_payload), std::end(new_payload), std::begin(full_payload) + old_payload.size()); - - parseRtspMessage(req.get(), full_payload.data(), full_payload.size()); - } - - print_msg(req.get()); - - msg_t resp; - auto func = _map_cmd_cb.find(req->message.request.command); - if(func != std::end(_map_cmd_cb)) { - func->second(this, peer, std::move(req)); - } - else { - cmd_not_found(host(), peer, std::move(req)); - } - - return; - } - case ENET_EVENT_TYPE_CONNECT: - BOOST_LOG(info) << "CLIENT CONNECTED TO RTSP"sv; - break; - case ENET_EVENT_TYPE_DISCONNECT: - BOOST_LOG(info) << "CLIENT DISCONNECTED FROM RTSP"sv; - break; - case ENET_EVENT_TYPE_NONE: - break; - } - } - } - - void map(const std::string_view &type, cmd_func_t cb) { - _map_cmd_cb.emplace(type, std::move(cb)); - } + safe::event_t launch_event; void clear(bool all = true) { auto lg = _session_slots.lock(); @@ -158,10 +239,8 @@ public: } } - if(all) { - std::for_each(_host->peers, _host->peers + _host->peerCount, [](auto &peer) { - enet_peer_disconnect_now(&peer, 0); - }); + if(all && !ios.stopped()) { + ios.stop(); } } @@ -186,24 +265,17 @@ public: return nullptr; } - net::host_t::pointer host() const { - return _host.get(); - } - - safe::event_t launch_event; - private: - // named _queue_packet because I want to make it an actual queue - // It's like this for convenience sake - std::pair _queue_packet; - std::unordered_map _map_cmd_cb; util::sync_t>> _session_slots; int _slot_count; - ENetAddress _addr; - net::host_t _host; + + boost::asio::io_service ios; + tcp::acceptor acceptor { ios }; + + std::shared_ptr next_socket; }; rtsp_server_t server; @@ -219,9 +291,26 @@ int session_count() { return server.session_count(); } -void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) { +int send(tcp::socket &sock, const std::string_view &sv) { + std::size_t bytes_send = 0; + + while(bytes_send != sv.size()) { + boost::system::error_code ec; + bytes_send += sock.send(boost::asio::buffer(sv.substr(bytes_send)), 0, ec); + + if(ec) { + BOOST_LOG(error) << "RTSP: Couldn't send data over tcp socket: "sv << ec.message(); + return -1; + } + } + + return 0; +} + +void respond(tcp::socket &sock, msg_t &resp) { auto payload = std::make_pair(resp->payload, resp->payloadLength); + // Restore response message for proper destruction auto lg = util::fail_guard([&]() { resp->payload = payload.first; resp->payloadLength = payload.second; @@ -239,57 +328,44 @@ void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) { << "---End Response---"sv << std::endl; std::string_view tmp_resp { raw_resp.get(), (size_t)serialized_len }; - { - auto packet = enet_packet_create(tmp_resp.data(), tmp_resp.size(), ENET_PACKET_FLAG_RELIABLE); - if(enet_peer_send(peer, 0, packet)) { - enet_packet_destroy(packet); - return; - } - enet_host_flush(host); + if(send(sock, tmp_resp)) { + return; } - if(payload.second > 0) { - auto packet = enet_packet_create(payload.first, payload.second, ENET_PACKET_FLAG_RELIABLE); - if(enet_peer_send(peer, 0, packet)) { - enet_packet_destroy(packet); - return; - } - - enet_host_flush(host); - } + send(sock, std::string_view { payload.first, (std::size_t)payload.second }); } -void respond(net::host_t::pointer host, net::peer_t peer, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { +void respond(tcp::socket &sock, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) { msg_t resp { new msg_t::element_type }; createRtspResponse(resp.get(), nullptr, 0, const_cast("RTSP/1.0"), statuscode, const_cast(status_msg), seqn, options, const_cast(payload.data()), (int)payload.size()); - respond(host, peer, resp); + respond(sock, resp); } -void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t &&req) { - respond(host, peer, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); +void cmd_not_found(tcp::socket &sock, msg_t &&req) { + respond(sock, nullptr, 404, "NOT FOUND", req->sequenceNumber, {}); } -void cmd_option(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { +void cmd_option(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified option.option = const_cast("CSeq"); - auto seqn_str = to_string(req->sequenceNumber); + auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, &option, 200, "OK", req->sequenceNumber, {}); } -void cmd_describe(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { +void cmd_describe(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified option.option = const_cast("CSeq"); - auto seqn_str = to_string(req->sequenceNumber); + auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); std::stringstream ss; @@ -324,10 +400,10 @@ void cmd_describe(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { ss << std::endl; } - respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, ss.str()); + respond(sock, &option, 200, "OK", req->sequenceNumber, ss.str()); } -void cmd_setup(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { +void cmd_setup(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { OPTION_ITEM options[3] {}; auto &seqn = options[0]; @@ -336,7 +412,7 @@ void cmd_setup(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { seqn.option = const_cast("CSeq"); - auto seqn_str = to_string(req->sequenceNumber); + auto seqn_str = std::to_string(req->sequenceNumber); seqn.content = const_cast(seqn_str.c_str()); std::string_view target { req->message.request.target }; @@ -355,7 +431,7 @@ void cmd_setup(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { port = map_port(stream::CONTROL_PORT); } else { - cmd_not_found(server->host(), peer, std::move(req)); + cmd_not_found(sock, std::move(req)); return; } @@ -374,22 +450,22 @@ void cmd_setup(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { port_option.content = port_value.data(); - respond(server->host(), peer, &seqn, 200, "OK", req->sequenceNumber, {}); + respond(sock, &seqn, 200, "OK", req->sequenceNumber, {}); } -void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { +void cmd_announce(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified option.option = const_cast("CSeq"); - auto seqn_str = to_string(req->sequenceNumber); + auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); if(!server->launch_event.peek()) { // /launch has not been used - respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); + respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {}); return; } auto launch_session { server->launch_event.pop() }; @@ -473,14 +549,14 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { } catch(std::out_of_range &) { - respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } if(config.monitor.videoFormat != 0 && config::video.hevc_mode == 1) { BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv; - respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); + respond(sock, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } @@ -490,31 +566,31 @@ void cmd_announce(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { if(!slot) { BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']'; - respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); + respond(sock, &option, 503, "Service Unavailable", req->sequenceNumber, {}); return; } - if(session::start(*session, platf::from_sockaddr((sockaddr *)&peer->address.address))) { + if(session::start(*session, sock.remote_endpoint().address().to_string())) { BOOST_LOG(error) << "Failed to start a streaming session"sv; server->clear(slot); - respond(server->host(), peer, &option, 500, "Internal Server Error", req->sequenceNumber, {}); + respond(sock, &option, 500, "Internal Server Error", req->sequenceNumber, {}); return; } - respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, &option, 200, "OK", req->sequenceNumber, {}); } -void cmd_play(rtsp_server_t *server, net::peer_t peer, msg_t &&req) { +void cmd_play(rtsp_server_t *server, tcp::socket &sock, msg_t &&req) { OPTION_ITEM option {}; // I know these string literals will not be modified option.option = const_cast("CSeq"); - auto seqn_str = to_string(req->sequenceNumber); + auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {}); + respond(sock, &option, 200, "OK", req->sequenceNumber, {}); } void rtpThread() { @@ -528,8 +604,9 @@ void rtpThread() { server.map("PLAY"sv, &cmd_play); - if(server.bind(map_port(RTSP_SETUP_PORT))) { - BOOST_LOG(fatal) << "Couldn't bind RTSP server to port ["sv << map_port(RTSP_SETUP_PORT) << "], likely another process already bound to the port"sv; + boost::system::error_code ec; + if(server.bind(map_port(RTSP_SETUP_PORT), ec)) { + BOOST_LOG(fatal) << "Couldn't bind RTSP server to port ["sv << map_port(RTSP_SETUP_PORT) << "], " << ec.message(); shutdown_event->raise(true); return;