Compile for Multicasting

This commit is contained in:
loki 2020-02-08 16:26:38 +01:00
parent 753f57c71b
commit 5cd0fd76bf
21 changed files with 1259 additions and 824 deletions

View File

@ -128,6 +128,8 @@ set(SUNSHINE_TARGET_FILES
sunshine/crypto.h
sunshine/nvhttp.cpp
sunshine/nvhttp.h
sunshine/rtsp.cpp
sunshine/rtsp.h
sunshine/stream.cpp
sunshine/stream.h
sunshine/video.cpp

View File

@ -50,6 +50,14 @@
# The value must be greater than 0 and lower than or equal to 100
# fec_percentage = 10
# When multicasting, it could be usefull to have different configurations for each connected Client.
# For example:
# Clients connected through WAN and LAN have different bitrate contstraints.
# Decoders may require different settings for color
#
# Unlike simply broadcasting to multiple Client, this will generate distinct video streams.
# Note, CPU usage increases for each distinct video stream generated
# channels = 1
# The back/select button on the controller
# On the Shield, the home and powerbutton are not passed to Moonlight

View File

@ -50,7 +50,7 @@ static opus_stream_config_t HighSurround51 = {
map_high_surround51
};
void encodeThread(std::shared_ptr<safe::queue_t<packet_t>> packets, sample_queue_t samples, config_t config) {
void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t config, void *channel_data) {
//FIXME: Pick correct opus_stream_config_t based on config.channels
auto stream = &stereo;
opus_t opus { opus_multistream_encoder_create(
@ -76,13 +76,13 @@ void encodeThread(std::shared_ptr<safe::queue_t<packet_t>> packets, sample_queue
}
packet.fake_resize(bytes);
packets->raise(std::move(packet));
packets->raise(std::make_pair(channel_data, std::move(packet)));
}
}
void capture(std::shared_ptr<safe::queue_t<packet_t>> packets, config_t config) {
void capture(packet_queue_t packets, config_t config, void *channel_data) {
auto samples = std::make_shared<sample_queue_t::element_type>();
std::thread thread { encodeThread, packets, samples, config };
std::thread thread { encodeThread, packets, samples, config, channel_data };
auto fg = util::fail_guard([&]() {
packets->stop();

View File

@ -11,8 +11,8 @@ struct config_t {
};
using packet_t = util::buffer_t<std::uint8_t>;
using packet_queue_t = std::shared_ptr<safe::queue_t<packet_t>>;
void capture(std::shared_ptr<safe::queue_t<packet_t>> packets, config_t config);
using packet_queue_t = std::shared_ptr<safe::queue_t<std::pair<void*, packet_t>>>;
void capture(packet_queue_t packets, config_t config, void *channel_data);
}
#endif

View File

@ -35,7 +35,8 @@ stream_t stream {
APPS_JSON_PATH,
10 // fecPercentage
10, // fecPercentage
1 // channels
};
nvhttp_t nvhttp {
@ -184,10 +185,15 @@ void parse_file(const char *file) {
});
int to = -1;
int_f(vars, "ping_timeout", to);
if(to > 0) {
stream.ping_timeout = std::chrono::milliseconds(to);
}
int_between_f(vars, "ping_timeout", to, {
-1, std::numeric_limits<int>::max()
});
stream.ping_timeout = std::chrono::milliseconds(to);
int_between_f(vars, "channels", stream.channels, {
1, std::numeric_limits<int>::max()
});
string_f(vars, "file_apps", stream.file_apps);
int_between_f(vars, "fec_percentage", stream.fec_percentage, {
1, 100

View File

@ -30,6 +30,9 @@ struct stream_t {
std::string file_apps;
int fec_percentage;
// max unique instances of video and audio streams
int channels;
};
struct nvhttp_t {

View File

@ -15,7 +15,7 @@
#include <boost/log/sources/severity_logger.hpp>
#include "nvhttp.h"
#include "stream.h"
#include "rtsp.h"
#include "config.h"
#include "thread_pool.h"
@ -124,6 +124,12 @@ int main(int argc, char *argv[]) {
return 7;
}
{
proc::ctx_t ctx;
ctx.name = "Desktop"s;
proc_opt->get_apps().emplace(std::begin(proc_opt->get_apps()), std::move(ctx));
}
proc::proc = std::move(*proc_opt);
auto deinit_guard = platf::init();

View File

@ -93,4 +93,23 @@ std::string_view to_enum_string(net_e net) {
// avoid warning
return "wan"sv;
}
host_t host_create(ENetAddress &addr, std::size_t peers, std::uint16_t port) {
enet_address_set_host(&addr, "0.0.0.0");
enet_address_set_port(&addr, port);
return host_t { enet_host_create(PF_INET, &addr, peers, 1, 0, 0) };
}
void free_host(ENetHost *host) {
std::for_each(host->peers, host->peers + host->peerCount, [](ENetPeer &peer_ref) {
ENetPeer *peer = &peer_ref;
if(peer) {
enet_peer_disconnect_now(peer, 0);
}
});
enet_host_destroy(host);
}
}

View File

@ -6,7 +6,18 @@
#define SUNSHINE_NETWORK_H
#include <tuple>
#include <enet/enet.h>
#include "utility.h"
namespace net {
void free_host(ENetHost *host);
using host_t = util::safe_ptr<ENetHost, free_host>;
using peer_t = ENetPeer*;
using packet_t = util::safe_ptr<ENetPacket, enet_packet_destroy>;
enum net_e : int {
PC,
LAN,
@ -17,6 +28,8 @@ net_e from_enum_string(const std::string_view &view);
std::string_view to_enum_string(net_e net);
net_e from_address(const std::string_view &view);
host_t host_create(ENetAddress &addr, std::size_t peers, std::uint16_t port);
}
#endif //SUNSHINE_NETWORK_H

View File

@ -18,7 +18,8 @@
#include "config.h"
#include "utility.h"
#include "stream.h"
#include "rtsp.h"
#include "crypto.h"
#include "nvhttp.h"
#include "platform/common.h"
#include "network.h"
@ -504,17 +505,14 @@ void applist(resp_https_t response, req_https_t request) {
pt::ptree desktop;
apps.put("<xmlattr>.status_code", 200);
desktop.put("IsHdrSupported"s, config::video.hevc_mode == 2 ? 1 : 0);
desktop.put("AppTitle"s, "Desktop");
desktop.put("ID"s, 1);
int x = 2;
int x = 0;
for(auto &proc : proc::proc.get_apps()) {
pt::ptree app;
app.put("IsHdrSupported"s, config::video.hevc_mode == 2 ? 1 : 0);
app.put("AppTitle"s, proc.name);
app.put("ID"s, x++);
app.put("ID"s, ++x);
apps.push_back(std::make_pair("App", std::move(app)));
}
@ -536,17 +534,15 @@ void launch(resp_https_t response, req_https_t request) {
auto args = request->parse_query_string();
auto appid = util::from_view(args.at("appid")) -2;
stream::launch_session_t launch_session;
if(stream::session_state != stream::state_e::STOPPED) {
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.gamesession", 0);
auto current_appid = proc::proc.running();
if(current_appid != -1) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 400);
return;
}
auto current_appid = proc::proc.running();
if(appid >= 0 && appid != current_appid) {
if(appid >= 0) {
auto err = proc::proc.execute(appid);
if(err) {
tree.put("root.<xmlattr>.status_code", err);
@ -554,12 +550,9 @@ void launch(resp_https_t response, req_https_t request) {
return;
}
current_appid = appid;
}
// Needed to determine if session must be closed when no process is running in proc::proc
launch_session.has_process = current_appid >= 0;
stream::launch_session_t launch_session;
auto clientID = args.at("uniqueid"s);
launch_session.gcm_key = *util::from_hex<crypto::aes_t>(args.at("rikey"s), true);
@ -571,14 +564,6 @@ void launch(resp_https_t response, req_https_t request) {
stream::launch_event.raise(launch_session);
/*
bool sops = args.at("sops"s) == "1";
std::optional<int> gcmap { std::nullopt };
if(auto it = args.find("gcmap"s); it != std::end(args)) {
gcmap = std::stoi(it->second);
}
*/
tree.put("root.<xmlattr>.status_code", 200);
tree.put("root.gamesession", 1);
}
@ -595,7 +580,7 @@ void resume(resp_https_t response, req_https_t request) {
});
auto current_appid = proc::proc.running();
if(current_appid == -1 || stream::session_state != stream::state_e::STOPPED) {
if(current_appid == -1) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
@ -603,8 +588,6 @@ void resume(resp_https_t response, req_https_t request) {
}
stream::launch_session_t launch_session;
// Needed to determine if session must be closed when no process is running in proc::proc
launch_session.has_process = current_appid >= 0;
auto args = request->parse_query_string();
auto clientID = args.at("uniqueid"s);
@ -639,13 +622,6 @@ void cancel(resp_https_t response, req_https_t request) {
return;
}
if(stream::session_state != stream::state_e::STOPPED) {
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.cancel", 0);
return;
}
proc::proc.terminate();
tree.put("root.cancel", 1);

View File

@ -8,6 +8,7 @@
#include <string>
#include "sunshine/utility.h"
struct sockaddr;
namespace platf {
constexpr auto MAX_GAMEPADS = 2;
@ -86,6 +87,8 @@ using input_t = util::safe_ptr<void, freeInput>;
std::string get_mac_address(const std::string_view &address);
std::string from_sockaddr(const sockaddr *const);
std::unique_ptr<mic_t> microphone(std::uint32_t sample_rate);
std::unique_ptr<display_t> display();

View File

@ -86,16 +86,16 @@ public:
client_t client;
};
std::string from_socket_address(const SOCKET_ADDRESS &socket_address) {
std::string from_sockaddr(const sockaddr *const socket_address) {
char data[INET6_ADDRSTRLEN];
auto family = socket_address.lpSockaddr->sa_family;
auto family = socket_address->sa_family;
if(family == AF_INET6) {
inet_ntop(AF_INET6, &((sockaddr_in6*)socket_address.lpSockaddr)->sin6_addr, data, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &((sockaddr_in6*)socket_address)->sin6_addr, data, INET6_ADDRSTRLEN);
}
if(family == AF_INET) {
inet_ntop(AF_INET, &((sockaddr_in*)socket_address.lpSockaddr)->sin_addr, data, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &((sockaddr_in*)socket_address)->sin_addr, data, INET_ADDRSTRLEN);
}
return std::string { data };
@ -116,7 +116,7 @@ std::string get_mac_address(const std::string_view &address) {
adapteraddrs_t info = get_adapteraddrs();
for(auto adapter_pos = info.get(); adapter_pos != nullptr; adapter_pos = adapter_pos->Next) {
for(auto addr_pos = adapter_pos->FirstUnicastAddress; addr_pos != nullptr; addr_pos = addr_pos->Next) {
if(adapter_pos->PhysicalAddressLength != 0 && address == from_socket_address(addr_pos->Address)) {
if(adapter_pos->PhysicalAddressLength != 0 && address == from_sockaddr(addr_pos->Address.lpSockaddr)) {
std::stringstream mac_addr;
mac_addr << std::hex;
for(int i = 0; i < adapter_pos->PhysicalAddressLength; i++) {

View File

@ -163,6 +163,9 @@ void proc_t::terminate() {
const std::vector<ctx_t> &proc_t::get_apps() const {
return _apps;
}
std::vector<ctx_t> &proc_t::get_apps() {
return _apps;
}
proc_t::~proc_t() {
terminate();

View File

@ -67,6 +67,8 @@ public:
~proc_t();
const std::vector<ctx_t> &get_apps() const;
std::vector<ctx_t> &get_apps();
void terminate();
private:

488
sunshine/rtsp.cpp Normal file
View File

@ -0,0 +1,488 @@
//
// Created by loki on 2/2/20.
//
extern "C" {
#include <moonlight-common-c/src/Rtsp.h>
}
#include "config.h"
#include "main.h"
#include "network.h"
#include "rtsp.h"
#include "input.h"
#include "stream.h"
namespace asio = boost::asio;
using asio::ip::tcp;
using asio::ip::udp;
using namespace std::literals;
namespace stream {
constexpr auto RTSP_SETUP_PORT = 48010;
void free_msg(PRTSP_MESSAGE msg) {
freeMessage(msg);
delete msg;
}
class rtsp_server_t;
using msg_t = util::safe_ptr<RTSP_MESSAGE, free_msg>;
using cmd_func_t = std::function<void(rtsp_server_t *, const std::shared_ptr<session_t> &, net::peer_t, msg_t&&)>;
safe::event_t<launch_session_t> launch_event;
void print_msg(PRTSP_MESSAGE msg);
void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req);
class rtsp_server_t {
public:
rtsp_server_t(rtsp_server_t &&) noexcept = default;
rtsp_server_t &operator=(rtsp_server_t &&) noexcept = default;
explicit rtsp_server_t(std::uint16_t port) : _session_slots (config::stream.channels), _host {net::host_create(_addr, 1, port) } {}
template<class T, class X>
void iterate(std::chrono::duration<T, X> timeout) {
ENetEvent event;
auto res = enet_host_service(_host.get(), &event, std::chrono::floor<std::chrono::milliseconds>(timeout).count());
if (res > 0) {
switch (event.type) {
case ENET_EVENT_TYPE_RECEIVE: {
net::packet_t packet{event.packet};
net::peer_t peer{event.peer};
msg_t req { new msg_t::element_type };
//TODO: compare addresses of the peers
if (_queue_packet.second == nullptr) {
parseRtspMessage(req.get(), (char *) packet->data, packet->dataLength);
for (auto option = req->options; option != nullptr; option = option->next) {
if ("Content-length"sv == option->option) {
_queue_packet = std::make_pair(peer, std::move(packet));
return;
}
}
}
else {
std::vector<char> full_payload;
auto old_msg = std::move(_queue_packet);
TUPLE_2D_REF(_, old_packet, old_msg);
std::string_view new_payload{(char *) packet->data, packet->dataLength};
std::string_view old_payload{(char *) old_packet->data, old_packet->dataLength};
full_payload.resize(new_payload.size() + old_payload.size());
std::copy(std::begin(old_payload), std::end(old_payload), std::begin(full_payload));
std::copy(std::begin(new_payload), std::end(new_payload), std::begin(full_payload) + old_payload.size());
parseRtspMessage(req.get(), full_payload.data(), full_payload.size());
}
print_msg(req.get());
msg_t resp;
auto func = _map_cmd_cb.find(req->message.request.command);
if (func != std::end(_map_cmd_cb)) {
func->second(this, nullptr, peer, std::move(req));
}
else {
cmd_not_found(host(), peer, std::move(req));
}
return;
}
case ENET_EVENT_TYPE_CONNECT:
BOOST_LOG(info) << "CLIENT CONNECTED TO RTSP"sv;
break;
case ENET_EVENT_TYPE_DISCONNECT:
BOOST_LOG(info) << "CLIENT DISCONNECTED FROM RTSP"sv;
break;
case ENET_EVENT_TYPE_NONE:
break;
}
}
}
void map(const std::string_view &type, cmd_func_t cb) {
_map_cmd_cb.emplace(type, std::move(cb));
}
void stop() {
for(auto &slot : _session_slots) {
auto session = slot.lock();
if (!session) {
continue;
}
// Wait until the session is properly running
while (session->state == state_e::STARTING) {
std::this_thread::sleep_for(1ms);
}
::stream::stop(*session);
BOOST_LOG(debug) << "Waiting for Audio to end..."sv;
session->audioThread.join();
BOOST_LOG(debug) << "Waiting for Video to end..."sv;
session->videoThread.join();
input::reset(input);
}
}
bool accept(const std::shared_ptr<session_t> &session) {
for(auto &slot : _session_slots) {
if(slot.expired()) {
slot = session;
return true;
}
}
return false;
}
net::host_t::pointer host() const {
return _host.get();
}
private:
// named _queue_packet because I want to make it an actual queue
// It's like this for convenience sake
std::pair<net::peer_t, net::packet_t> _queue_packet;
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;
std::vector<std::weak_ptr<session_t>> _session_slots;
ENetAddress _addr;
net::host_t _host;
};
void respond(net::host_t::pointer host, net::peer_t peer, msg_t &resp) {
auto payload = std::make_pair(resp->payload, resp->payloadLength);
auto lg = util::fail_guard([&]() {
resp->payload = payload.first;
resp->payloadLength = payload.second;
});
resp->payload = nullptr;
resp->payloadLength = 0;
int serialized_len;
util::c_ptr<char> raw_resp { serializeRtspMessage(resp.get(), &serialized_len) };
BOOST_LOG(debug)
<< "---Begin Response---"sv << std::endl
<< std::string_view { raw_resp.get(), (std::size_t)serialized_len } << std::endl
<< std::string_view { payload.first, (std::size_t)payload.second } << std::endl
<< "---End Response---"sv << std::endl;
std::string_view tmp_resp { raw_resp.get(), (size_t)serialized_len };
{
auto packet = enet_packet_create(tmp_resp.data(), tmp_resp.size(), ENET_PACKET_FLAG_RELIABLE);
if(enet_peer_send(peer, 0, packet)) {
enet_packet_destroy(packet);
return;
}
enet_host_flush(host);
}
if(payload.second > 0) {
auto packet = enet_packet_create(payload.first, payload.second, ENET_PACKET_FLAG_RELIABLE);
if(enet_peer_send(peer, 0, packet)) {
enet_packet_destroy(packet);
return;
}
enet_host_flush(host);
}
}
void respond(net::host_t::pointer host, net::peer_t peer, POPTION_ITEM options, int statuscode, const char *status_msg, int seqn, const std::string_view &payload) {
msg_t resp { new msg_t::element_type };
createRtspResponse(resp.get(), nullptr, 0, const_cast<char*>("RTSP/1.0"), statuscode, const_cast<char*>(status_msg), seqn, options, const_cast<char*>(payload.data()), (int)payload.size());
respond(host, peer, resp);
}
void cmd_not_found(net::host_t::pointer host, net::peer_t peer, msg_t&& req) {
respond(host, peer, nullptr, 404, "NOT FOUND", req->sequenceNumber, {});
}
void cmd_option(rtsp_server_t *server, const std::shared_ptr<session_t> &session, net::peer_t peer, msg_t&& req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
option.option = const_cast<char*>("CSeq");
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str());
respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {});
}
void cmd_describe(rtsp_server_t *server, const std::shared_ptr<session_t> &session, net::peer_t peer, msg_t&& req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
option.option = const_cast<char*>("CSeq");
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str());
std::string_view payload;
if(config::video.hevc_mode == 0) {
payload = "surround-params=NONE"sv;
}
else {
payload = "sprop-parameter-sets=AAAAAU;surround-params=NONE"sv;
}
respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, payload);
}
void cmd_setup(rtsp_server_t *server, const std::shared_ptr<session_t> &session, net::peer_t peer, msg_t &&req) {
OPTION_ITEM options[2] {};
auto &seqn = options[0];
auto &session_option = options[1];
seqn.option = const_cast<char*>("CSeq");
auto seqn_str = std::to_string(req->sequenceNumber);
seqn.content = const_cast<char*>(seqn_str.c_str());
if(session->idr_events) {
// already streaming
respond(server->host(), peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
std::string_view target { req->message.request.target };
auto begin = std::find(std::begin(target), std::end(target), '=') + 1;
auto end = std::find(begin, std::end(target), '/');
std::string_view type { begin, (size_t)std::distance(begin, end) };
if(type == "audio"sv) {
seqn.next = &session_option;
session_option.option = const_cast<char*>("Session");
session_option.content = const_cast<char*>("DEADBEEFCAFE;timeout = 90");
}
else if(type != "video"sv && type != "control"sv) {
cmd_not_found(server->host(), peer, std::move(req));
return;
}
respond(server->host(), peer, &seqn, 200, "OK", req->sequenceNumber, {});
}
void cmd_announce(rtsp_server_t *server, const std::shared_ptr<session_t> &session, net::peer_t peer, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
option.option = const_cast<char*>("CSeq");
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str());
auto expected_state = state_e::STOPPED;
auto abort = session->state.compare_exchange_strong(expected_state, state_e::STARTING);
if(abort || !launch_event.peek()) {
//Either already streaming or /launch has not been used
if(!abort) {
session->state.store(state_e::STOPPED);
}
respond(server->host(), peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
auto launch_session { launch_event.pop() };
std::string_view payload { req->payload, (size_t)req->payloadLength };
std::vector<std::string_view> lines;
auto whitespace = [](char ch) {
return ch == '\n' || ch == '\r';
};
{
auto pos = std::begin(payload);
auto begin = pos;
while (pos != std::end(payload)) {
if (whitespace(*pos++)) {
lines.emplace_back(begin, pos - begin - 1);
while(pos != std::end(payload) && whitespace(*pos)) { ++pos; }
begin = pos;
}
}
}
std::string_view client;
std::unordered_map<std::string_view, std::string_view> args;
for(auto line : lines) {
auto type = line.substr(0, 2);
if(type == "s="sv) {
client = line.substr(2);
}
else if(type == "a=") {
auto pos = line.find(':');
auto name = line.substr(2, pos - 2);
auto val = line.substr(pos + 1);
if(val[val.size() -1] == ' ') {
val = val.substr(0, val.size() -1);
}
args.emplace(name, val);
}
}
// Initialize any omitted parameters to defaults
args.try_emplace("x-nv-video[0].encoderCscMode"sv, "0"sv);
args.try_emplace("x-nv-vqos[0].bitStreamFormat"sv, "0"sv);
args.try_emplace("x-nv-video[0].dynamicRangeMode"sv, "0"sv);
args.try_emplace("x-nv-aqos.packetDuration"sv, "5"sv);
try {
auto &config = session->config;
config.audio.channels = util::from_view(args.at("x-nv-audio.surround.numChannels"sv));
config.audio.mask = util::from_view(args.at("x-nv-audio.surround.channelMask"sv));
config.audio.packetDuration = util::from_view(args.at("x-nv-aqos.packetDuration"sv));
config.packetsize = util::from_view(args.at("x-nv-video[0].packetSize"sv));
config.monitor.height = util::from_view(args.at("x-nv-video[0].clientViewportHt"sv));
config.monitor.width = util::from_view(args.at("x-nv-video[0].clientViewportWd"sv));
config.monitor.framerate = util::from_view(args.at("x-nv-video[0].maxFPS"sv));
config.monitor.bitrate = util::from_view(args.at("x-nv-vqos[0].bw.maximumBitrateKbps"sv));
config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv));
config.monitor.numRefFrames = util::from_view(args.at("x-nv-video[0].maxNumReferenceFrames"sv));
config.monitor.encoderCscMode = util::from_view(args.at("x-nv-video[0].encoderCscMode"sv));
config.monitor.videoFormat = util::from_view(args.at("x-nv-vqos[0].bitStreamFormat"sv));
config.monitor.dynamicRange = util::from_view(args.at("x-nv-video[0].dynamicRangeMode"sv));
} catch(std::out_of_range &) {
respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
if(session->config.monitor.videoFormat != 0 && config::video.hevc_mode == 0) {
BOOST_LOG(warning) << "HEVC is disabled, yet the client requested HEVC"sv;
respond(server->host(), peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
auto &gcm_key = launch_session->gcm_key;
auto &iv = launch_session->iv;
std::copy(std::begin(gcm_key), std::end(gcm_key), std::begin(session->gcm_key));
std::copy(std::begin(iv), std::end(iv), std::begin(session->iv));
session->pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
session->idr_events = std::make_shared<video::idr_event_t::element_type>();
session->audioThread = std::thread {audioThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)};
session->videoThread = std::thread {videoThread, session, platf::from_sockaddr((sockaddr*)&peer->address.address)};
session->state.store(state_e::RUNNING);
respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {});
}
void cmd_play(rtsp_server_t *server, const std::shared_ptr<session_t> &session, net::peer_t peer, msg_t &&req) {
OPTION_ITEM option {};
// I know these string literals will not be modified
option.option = const_cast<char*>("CSeq");
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str());
respond(server->host(), peer, &option, 200, "OK", req->sequenceNumber, {});
}
void rtpThread(std::shared_ptr<safe::event_t<bool>> shutdown_event) {
input = std::make_shared<input::input_t>();
auto fg = util::fail_guard([&]() {
input.reset();
});
rtsp_server_t server(RTSP_SETUP_PORT);
server.map("OPTIONS"sv, &cmd_option);
server.map("DESCRIBE"sv, &cmd_describe);
server.map("SETUP"sv, &cmd_setup);
server.map("ANNOUNCE"sv, &cmd_announce);
server.map("PLAY"sv, &cmd_play);
while(!shutdown_event->peek()) {
server.iterate(std::min(500ms, config::stream.ping_timeout));
}
server.stop();
}
void print_msg(PRTSP_MESSAGE msg) {
std::string_view type = msg->type == TYPE_RESPONSE ? "RESPONSE"sv : "REQUEST"sv;
std::string_view payload { msg->payload, (size_t)msg->payloadLength };
std::string_view protocol { msg->protocol };
auto seqnm = msg->sequenceNumber;
std::string_view messageBuffer { msg->messageBuffer };
BOOST_LOG(debug) << "type ["sv << type << ']';
BOOST_LOG(debug) << "sequence number ["sv << seqnm << ']';
BOOST_LOG(debug) << "protocol :: "sv << protocol;
BOOST_LOG(debug) << "payload :: "sv << payload;
if(msg->type == TYPE_RESPONSE) {
auto &resp = msg->message.response;
auto statuscode = resp.statusCode;
std::string_view status { resp.statusString };
BOOST_LOG(debug) << "statuscode :: "sv << statuscode;
BOOST_LOG(debug) << "status :: "sv << status;
}
else {
auto& req = msg->message.request;
std::string_view command { req.command };
std::string_view target { req.target };
BOOST_LOG(debug) << "command :: "sv << command;
BOOST_LOG(debug) << "target :: "sv << target;
}
for(auto option = msg->options; option != nullptr; option = option->next) {
std::string_view content { option->content };
std::string_view name { option->option };
BOOST_LOG(debug) << name << " :: "sv << content;
}
BOOST_LOG(debug) << "---Begin MessageBuffer---"sv << std::endl << messageBuffer << std::endl << "---End MessageBuffer---"sv << std::endl;
}
}

25
sunshine/rtsp.h Normal file
View File

@ -0,0 +1,25 @@
//
// Created by loki on 2/2/20.
//
#ifndef SUNSHINE_RTSP_H
#define SUNSHINE_RTSP_H
#include <atomic>
#include "crypto.h"
#include "thread_safe.h"
namespace stream {
struct launch_session_t {
crypto::aes_t gcm_key;
crypto::aes_t iv;
};
extern safe::event_t<launch_session_t> launch_event;
void rtpThread(std::shared_ptr<safe::event_t<bool>> shutdown_event);
}
#endif //SUNSHINE_RTSP_H

File diff suppressed because it is too large Load Diff

View File

@ -5,12 +5,32 @@
#ifndef SUNSHINE_STREAM_H
#define SUNSHINE_STREAM_H
#include <atomic>
#include <boost/asio.hpp>
#include "crypto.h"
#include "thread_safe.h"
#include "video.h"
#include "audio.h"
#include "crypto.h"
namespace input {
struct input_t;
}
namespace stream {
constexpr auto VIDEO_STREAM_PORT = 47998;
constexpr auto CONTROL_PORT = 47999;
constexpr auto AUDIO_STREAM_PORT = 48000;
namespace asio = boost::asio;
namespace sys = boost::system;
using asio::ip::tcp;
using asio::ip::udp;
enum class socket_e : int {
video,
audio
};
enum class state_e : int {
STOPPED,
@ -19,18 +39,64 @@ enum class state_e : int {
RUNNING,
};
struct launch_session_t {
using message_queue_t = std::shared_ptr<safe::queue_t<std::pair<std::uint16_t, std::string>>>;
using message_queue_queue_t = std::shared_ptr<safe::queue_t<std::tuple<socket_e, asio::ip::address, message_queue_t>>>;
struct config_t {
audio::config_t audio;
video::config_t monitor;
int packetsize;
bool sops;
std::optional<int> gcmap;
};
struct broadcast_ctx_t {
safe::event_t<bool> shutdown_event;
video::packet_queue_t video_packets;
audio::packet_queue_t audio_packets;
message_queue_queue_t session_queue;
std::thread video_thread;
std::thread audio_thread;
std::thread control_thread;
std::thread recv_thread;
asio::io_service io;
udp::socket video_sock { io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT) };
udp::socket audio_sock { io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT) };
};
struct session_t {
config_t config;
std::thread audioThread;
std::thread videoThread;
std::chrono::steady_clock::time_point pingTimeout;
udp::endpoint video_peer;
udp::endpoint audio_peer;
video::idr_event_t idr_events;
crypto::aes_t gcm_key;
crypto::aes_t iv;
bool has_process;
std::atomic<state_e> state;
};
extern safe::event_t<launch_session_t> launch_event;
extern std::atomic<state_e> session_state;
void videoThread(std::shared_ptr<session_t> session, std::string addr_str);
void audioThread(std::shared_ptr<session_t> session, std::string addr_str);
void rtpThread(std::shared_ptr<safe::event_t<bool>> shutdown_event);
void stop(session_t &session);
extern std::shared_ptr<input::input_t> input;
}
#endif //SUNSHINE_STREAM_H

View File

@ -8,6 +8,7 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include "utility.h"
@ -50,6 +51,26 @@ public:
return val;
}
// pop and view shoud not be used interchangebly
template<class Rep, class Period>
status_t pop(std::chrono::duration<Rep, Period> delay) {
std::unique_lock ul{_lock};
if (!_continue) {
return util::false_v<status_t>;
}
while (!_status) {
if (!_continue || _cv.wait_for(ul, delay) == std::cv_status::timeout) {
return util::false_v<status_t>;
}
}
auto val = std::move(_status);
_status = util::false_v<status_t>;
return val;
}
// pop and view shoud not be used interchangebly
const status_t &view() {
std::unique_lock ul{_lock};
@ -119,6 +140,26 @@ public:
return !_queue.empty();
}
template<class Rep, class Period>
status_t pop(std::chrono::duration<Rep, Period> delay) {
std::unique_lock ul{_lock};
if (!_continue) {
return util::false_v<status_t>;
}
while (_queue.empty()) {
if (!_continue || _cv.wait_for(ul, delay) == std::cv_status::timeout) {
return util::false_v<status_t>;
}
}
auto val = std::move(_queue.front());
_queue.erase(std::begin(_queue));
return val;
}
status_t pop() {
std::unique_lock ul{_lock};
@ -165,6 +206,108 @@ private:
std::vector<T> _queue;
};
template<class T>
class shared_t {
public:
using element_type = T;
using construct_f = std::function<int(element_type &)>;
using destruct_f = std::function<void(element_type &)>;
struct ptr_t {
shared_t *owner;
explicit ptr_t(shared_t *owner) : owner { owner } {}
ptr_t(ptr_t &&ptr) noexcept {
owner = ptr.owner;
ptr.owner = nullptr;
}
ptr_t(const ptr_t &ptr) noexcept {
auto tmp = ptr.owner->ref();
owner = tmp.owner;
tmp.owner = nullptr;
}
ptr_t &operator=(const ptr_t &ptr) noexcept {
return *this = std::move(*ptr.owner->ref());
}
ptr_t &operator=(ptr_t &&ptr) noexcept {
if(owner) {
release();
}
std::swap(owner, ptr.owner);
return *this;
}
~ptr_t() {
if(owner) {
release();
}
}
operator bool () const {
return owner != nullptr;
}
void release() {
std::lock_guard lg { owner->_lock };
auto c = owner->_count.fetch_sub(1, std::memory_order_acquire);
if(c - 1 == 0) {
owner->_destruct(*get());
(*this)->~element_type();
}
owner = nullptr;
}
element_type *get() const {
return reinterpret_cast<element_type*>(owner->_object_buf.data());
}
element_type *operator->() {
return reinterpret_cast<element_type*>(owner->_object_buf.data());
}
};
template<class FC, class FD>
shared_t(FC && fc, FD &&fd) : _construct { std::forward<FC>(fc) }, _destruct { std::forward<FD>(fd) } {}
[[nodiscard]] ptr_t ref() {
auto c = _count.fetch_add(1, std::memory_order_acquire);
if(!c) {
std::lock_guard lg { _lock };
new(_object_buf.data()) element_type;
if(_construct(*reinterpret_cast<element_type*>(_object_buf.data()))) {
return ptr_t { nullptr };
}
}
return ptr_t { this };
}
private:
construct_f _construct;
destruct_f _destruct;
std::array<std::uint8_t, sizeof(element_type)> _object_buf;
std::atomic<std::uint32_t> _count;
std::mutex _lock;
};
template<class T, class F_Construct, class F_Destruct>
auto make_shared(F_Construct &&fc, F_Destruct &&fd) {
return shared_t<T> {
std::forward<F_Construct>(fc), std::forward<F_Destruct>(fd)
};
}
}
#endif //SUNSHINE_THREAD_SAFE_H

View File

@ -2,14 +2,15 @@
// Created by loki on 6/6/19.
//
#include <atomic>
#include <thread>
extern "C" {
#include <libavcodec/avcodec.h>
#include <libswscale/swscale.h>
}
#include "platform/common.h"
#include "thread_pool.h"
#include "config.h"
#include "video.h"
#include "main.h"
@ -32,9 +33,20 @@ void free_packet(AVPacket *packet) {
using ctx_t = util::safe_ptr<AVCodecContext, free_ctx>;
using frame_t = util::safe_ptr<AVFrame, free_frame>;
using sws_t = util::safe_ptr<SwsContext, sws_freeContext>;
using img_event_t = std::shared_ptr<safe::event_t<std::unique_ptr<platf::img_t>>>;
using img_event_t = std::shared_ptr<safe::event_t<std::shared_ptr<platf::img_t>>>;
auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) {
struct capture_ctx_t {
img_event_t images;
std::chrono::nanoseconds delay;
std::chrono::steady_clock::time_point next_frame;
};
struct capture_thread_ctx_t {
std::shared_ptr<safe::queue_t<capture_ctx_t>> capture_ctx_queue;
std::thread capture_thread;
};
[[nodiscard]] auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) {
avcodec_open2(ctx.get(), codec, options);
return util::fail_guard([&]() {
@ -42,7 +54,87 @@ auto open_codec(ctx_t &ctx, AVCodec *codec, AVDictionary **options) {
});
}
void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::img_t &img, packet_queue_t &packets) {
int capture_display(platf::img_t *img, std::unique_ptr<platf::display_t> &disp) {
auto status = disp->snapshot(img, display_cursor);
switch (status) {
case platf::capture_e::reinit: {
// We try this twice, in case we still get an error on reinitialization
for(int x = 0; x < 2; ++x) {
disp.reset();
disp = platf::display();
if(disp) {
break;
}
std::this_thread::sleep_for(200ms);
}
if(!disp) {
return -1;
}
return -1;
}
case platf::capture_e::error:
return -1;
// Prevent warning during compilation
case platf::capture_e::timeout:
case platf::capture_e::ok:
return 0;
default:
BOOST_LOG(error) << "Unrecognized capture status ["sv << (int)status << ']';
return -1;
}
}
void captureThread(std::shared_ptr<safe::queue_t<capture_ctx_t>> capture_ctx_queue) {
std::vector<capture_ctx_t> capture_ctxs;
auto fg = util::fail_guard([&]() {
capture_ctx_queue->stop();
// Stop all sessions listening to this thread
for(auto &capture_ctx : capture_ctxs) {
capture_ctx.images->stop();
}
for(auto &capture_ctx : capture_ctx_queue->unsafe()) {
capture_ctx.images->stop();
}
});
auto disp = platf::display();
while(capture_ctx_queue->running()) {
while(capture_ctx_queue->peek()) {
capture_ctxs.emplace_back(std::move(*capture_ctx_queue->pop()));
}
std::shared_ptr<platf::img_t> img = disp->alloc_img();
auto has_error = capture_display(img.get(), disp);
if(has_error) {
return;
}
auto time_point = std::chrono::steady_clock::now();
for(auto capture_ctx = std::begin(capture_ctxs); capture_ctx != std::end(capture_ctxs); ++capture_ctx) {
if(!capture_ctx->images->running()) {
capture_ctx = capture_ctxs.erase(capture_ctx);
continue;
}
if(time_point > capture_ctx->next_frame) {
continue;
}
capture_ctx->images->raise(img);
capture_ctx->next_frame = time_point + capture_ctx->delay;
}
}
}
void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::img_t &img, packet_queue_t &packets, void *channel_data) {
av_frame_make_writable(yuv_frame.get());
const int linesizes[2] {
@ -67,7 +159,7 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im
}
while (ret >= 0) {
packet_t packet { av_packet_alloc() };
auto packet = std::make_unique<packet_t::element_type>(nullptr);
ret = avcodec_receive_packet(ctx.get(), packet.get());
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
@ -79,17 +171,51 @@ void encode(int64_t frame, ctx_t &ctx, sws_t &sws, frame_t &yuv_frame, platf::im
std::abort();
}
packet->channel_data = channel_data;
packets->raise(std::move(packet));
}
}
void encodeThread(
img_event_t images,
int start_capture(capture_thread_ctx_t &capture_thread_ctx) {
capture_thread_ctx.capture_ctx_queue = std::make_shared<safe::queue_t<capture_ctx_t>>();
capture_thread_ctx.capture_thread = std::thread {
captureThread, capture_thread_ctx.capture_ctx_queue
};
return 0;
}
void end_capture(capture_thread_ctx_t &capture_thread_ctx) {
capture_thread_ctx.capture_ctx_queue->stop();
capture_thread_ctx.capture_thread.join();
}
void capture(
packet_queue_t packets,
idr_event_t idr_events,
config_t config) {
config_t config,
void *channel_data) {
int framerate = config.framerate;
auto images = std::make_shared<img_event_t::element_type>();
// Keep a reference counter to ensure the capture thread only runs when other threads have a reference to the capture thread
static auto capture_thread = safe::make_shared<capture_thread_ctx_t>(start_capture, end_capture);
auto ref = capture_thread.ref();
if(!ref) {
return;
}
ref->capture_ctx_queue->raise(capture_ctx_t {
images, std::chrono::floor<std::chrono::nanoseconds>(1s) / framerate, std::chrono::steady_clock::now()
});
if(!ref->capture_ctx_queue->running()) {
return;
}
AVCodec *codec;
if(config.videoFormat == 0) {
@ -217,7 +343,11 @@ void encodeThread(
// Initiate scaling context with correct height and width
sws_t sws;
while (auto img = images->pop()) {
while(auto img = images->pop()) {
if(!idr_events->running()) {
break;
}
auto new_width = img->width;
auto new_height = img->height;
@ -250,68 +380,11 @@ void encodeThread(
yuv_frame->pict_type = AV_PICTURE_TYPE_I;
}
encode(frame++, ctx, sws, yuv_frame, *img, packets);
encode(frame++, ctx, sws, yuv_frame, *img, packets, channel_data);
yuv_frame->pict_type = AV_PICTURE_TYPE_NONE;
}
}
void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config) {
display_cursor = true;
int framerate = config.framerate;
auto disp = platf::display();
if(!disp) {
packets->stop();
return;
}
img_event_t images {new img_event_t::element_type };
std::thread encoderThread { &encodeThread, images, packets, idr_events, config };
auto time_span = std::chrono::floor<std::chrono::nanoseconds>(1s) / framerate;
while(packets->running()) {
auto next_snapshot = std::chrono::steady_clock::now() + time_span;
auto img = disp->alloc_img();
auto status = disp->snapshot(img.get(), display_cursor);
switch(status) {
case platf::capture_e::reinit: {
// We try this twice, in case we still get an error on reinitialization
for(int x = 0; x < 2; ++x) {
disp.reset();
disp = platf::display();
if (disp) {
break;
}
std::this_thread::sleep_for(200ms);
}
if (!disp) {
packets->stop();
}
continue;
}
case platf::capture_e::timeout:
std::this_thread::sleep_until(next_snapshot);
continue;
case platf::capture_e::error:
packets->stop();
continue;
// Prevent warning during compilation
case platf::capture_e::ok:
break;
}
images->raise(std::move(img));
std::this_thread::sleep_until(next_snapshot);
}
images->stop();
encoderThread.join();
}
}

View File

@ -6,14 +6,37 @@
#define SUNSHINE_VIDEO_H
#include "thread_safe.h"
#include "platform/common.h"
extern "C" {
#include <libavcodec/avcodec.h>
}
struct AVPacket;
namespace video {
void free_packet(AVPacket *packet);
using packet_t = util::safe_ptr<AVPacket, free_packet>;
struct packet_raw_t : public AVPacket {
template<class P>
explicit packet_raw_t(P *user_data) : channel_data { user_data } {
av_init_packet(this);
}
explicit packet_raw_t(std::nullptr_t null) : channel_data { nullptr } {
av_init_packet(this);
}
~packet_raw_t() {
av_packet_unref(this);
}
void *channel_data;
};
using packet_t = std::unique_ptr<packet_raw_t>;
using packet_queue_t = std::shared_ptr<safe::queue_t<packet_t>>;
using idr_event_t = std::shared_ptr<safe::event_t<std::pair<int64_t, int64_t>>>;
using img_event_t = std::shared_ptr<safe::event_t<std::shared_ptr<platf::img_t>>>;
struct config_t {
int width;
@ -27,7 +50,11 @@ struct config_t {
int dynamicRange;
};
void capture_display(packet_queue_t packets, idr_event_t idr_events, config_t config);
void capture(
packet_queue_t packets,
idr_event_t idr_events,
config_t config,
void *channel_data);
}
#endif //SUNSHINE_VIDEO_H