Fix potential deadlock

This commit is contained in:
loki 2019-12-08 23:31:37 +01:00
parent eb57c35ffc
commit 75d17a3d59
2 changed files with 22 additions and 52 deletions

View File

@ -11,6 +11,7 @@ struct config_t {
};
using packet_t = util::buffer_t<std::uint8_t>;
using packet_queue_t = std::shared_ptr<safe::queue_t<packet_t>>;
void capture(std::shared_ptr<safe::queue_t<packet_t>> packets, config_t config);
}

View File

@ -108,7 +108,9 @@ struct session_t {
std::thread controlThread;
std::chrono::steady_clock::time_point pingTimeout;
int client_state;
video::packet_queue_t video_packets;
audio::packet_queue_t audio_packets;
crypto::aes_t gcm_key;
crypto::aes_t iv;
@ -411,7 +413,7 @@ void print_msg(PRTSP_MESSAGE msg) {
}
using frame_queue_t = std::vector<video::packet_t>;
video::packet_t next_packet(uint16_t &frame, std::shared_ptr<safe::queue_t<video::packet_t>> &packets, frame_queue_t &packet_queue) {
video::packet_t next_packet(uint16_t &frame, video::packet_queue_t &packets, frame_queue_t &packet_queue) {
auto packet = packets->pop();
if(!packet) {
@ -440,27 +442,7 @@ video::packet_t next_packet(uint16_t &frame, std::shared_ptr<safe::queue_t<video
++frame;
return packet;
}
/*
std::vector<uint8_t> replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) {
std::vector<uint8_t> replaced;
auto search = [&](auto it) {
return std::search(it, std::end(original), std::begin(old), std::end(old));
};
auto begin = std::begin(original);
for(auto next = search(begin); next != std::end(original); next = search(++next)) {
std::copy(begin, next, std::back_inserter(replaced));
std::copy(std::begin(_new), std::end(_new), std::back_inserter(replaced));
next = begin = next + old.size();
}
std::copy(begin, std::end(original), std::back_inserter(replaced));
return replaced;
}
*/
std::vector<uint8_t> replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) {
std::vector<uint8_t> replaced;
@ -547,7 +529,8 @@ void controlThread(video::event_queue_t idr_events) {
// something went wrong :(
std::cout << "failed to verify tag"sv << std::endl;
session.client_state = 0;
session.video_packets->stop();
session.audio_packets->stop();
}
if(tagged_cipher_length >= 16 + session.iv.size()) {
@ -558,9 +541,10 @@ void controlThread(video::event_queue_t idr_events) {
input::passthrough(input, plaintext.data());
});
while(session.client_state > 0) {
while(session.video_packets->running()) {
if(std::chrono::steady_clock::now() > session.pingTimeout) {
session.client_state = 0;
session.video_packets->stop();
session.audio_packets->stop();
}
server.iterate(500ms);
@ -575,7 +559,7 @@ std::optional<udp::endpoint> recv_peer(udp::socket &sock) {
};
udp::endpoint peer;
while (session.client_state > 0) {
while (session.video_packets->running()) {
asio::deadline_timer timer { EXECUTOR((&sock)) };
timer.expires_from_now(boost::posix_time::seconds(2));
timer.async_wait([&](sys::error_code c){
@ -613,19 +597,12 @@ void audioThread() {
return;
}
std::shared_ptr<safe::queue_t<audio::packet_t>> packets{new safe::queue_t<audio::packet_t>};
auto &packets = session.audio_packets;
std::thread captureThread{audio::capture, packets, config.audio};
uint16_t frame{1};
while (auto packet = packets->pop()) {
if(session.client_state == 0) {
packets->stop();
break;
}
audio_packet_t audio_packet { (audio_packet_raw_t*)malloc(sizeof(audio_packet_raw_t) + packet->size()) };
audio_packet->rtp.sequenceNumber = util::endian::big(frame++);
@ -652,20 +629,13 @@ void videoThread(video::event_queue_t idr_events) {
return;
}
video::packet_queue_t packets{new safe::queue_t<video::packet_t>};
auto &packets = session.video_packets;
std::thread captureThread{video::capture_display, packets, idr_events, config.monitor};
frame_queue_t packet_queue;
uint16_t frame{1};
while (auto packet = next_packet(frame, packets, packet_queue)) {
if(session.client_state == 0) {
packets->stop();
break;
}
std::string_view payload{(char *) packet->data, (size_t) packet->size};
std::vector<uint8_t> payload_new;
@ -841,7 +811,7 @@ void cmd_setup(host_t &host, peer_t peer, msg_t &&req) {
auto seqn_str = std::to_string(req->sequenceNumber);
seqn.content = const_cast<char*>(seqn_str.c_str());
if(session.client_state >= 0) {
if(session.video_packets) {
// already streaming
respond(host, peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {});
@ -877,7 +847,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
auto seqn_str = std::to_string(req->sequenceNumber);
option.content = const_cast<char*>(seqn_str.c_str());
if(session.client_state >= 0) {
if(session.video_packets) {
// already streaming
respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {});
@ -942,9 +912,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv));
} catch(std::out_of_range &) {
// This piece of code is reached when for some reason, the payload length received < payload length send
// Not sure if this is an issue with Sunshine or Moonlight or the network
// TODO: find out
respond(host, peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {});
return;
}
@ -953,7 +921,9 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) {
std::copy(std::begin(iv), std::end(iv), std::begin(session.iv));
session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout;
session.client_state = 1;
session.video_packets = std::make_shared<video::packet_queue_t::element_type>();
session.audio_packets = std::make_shared<audio::packet_queue_t::element_type>();
video::event_queue_t idr_events { new video::event_queue_t::element_type };
session.audioThread = std::thread {audioThread};
@ -976,8 +946,6 @@ void cmd_play(host_t &host, peer_t peer, msg_t &&req) {
}
void rtpThread() {
session.client_state = -1;
rtsp_server_t server(RTSP_SETUP_PORT);
server.map("OPTIONS"sv, &cmd_option);
@ -990,12 +958,13 @@ void rtpThread() {
while(true) {
server.iterate(1s);
if(session.client_state == 0) {
if(session.video_packets && !session.video_packets->running()) {
session.audioThread.join();
session.videoThread.join();
session.controlThread.join();
session.client_state = -1;
session.video_packets = video::packet_queue_t();
session.audio_packets = audio::packet_queue_t();
}
}
}