Implement basic network flow control (#2803)

Co-authored-by: Cameron Gutman <aicommander@gmail.com>
This commit is contained in:
ns6089 2024-07-11 03:03:16 +03:00 committed by GitHub
parent 6607a28a68
commit 037c61dc99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 352 additions and 101 deletions

View File

@ -67,6 +67,7 @@ list(PREPEND PLATFORM_LIBRARIES
libstdc++.a
libwinpthread.a
libssp.a
ntdll
ksuser
wsock32
ws2_32

View File

@ -10,6 +10,8 @@
#include <mutex>
#include <string>
#include <boost/core/noncopyable.hpp>
#include "src/config.h"
#include "src/logging.h"
#include "src/stat_trackers.h"
@ -799,4 +801,30 @@ namespace platf {
*/
std::vector<supported_gamepad_t> &
supported_gamepads(input_t *input);
struct high_precision_timer: private boost::noncopyable {
virtual ~high_precision_timer() = default;
/**
* @brief Sleep for the duration
* @param duration Sleep duration
*/
virtual void
sleep_for(const std::chrono::nanoseconds &duration) = 0;
/**
* @brief Check if platform-specific timer backend has been initialized successfully
* @return `true` on success, `false` on error
*/
virtual
operator bool() = 0;
};
/**
* @brief Create platform-specific timer capable of high-precision sleep
* @return A unique pointer to timer
*/
std::unique_ptr<high_precision_timer>
create_high_precision_timer();
} // namespace platf

View File

@ -933,4 +933,21 @@ namespace platf {
return std::make_unique<deinit_t>();
}
class linux_high_precision_timer: public high_precision_timer {
public:
void
sleep_for(const std::chrono::nanoseconds &duration) override {
std::this_thread::sleep_for(duration);
}
operator bool() override {
return true;
}
};
std::unique_ptr<high_precision_timer>
create_high_precision_timer() {
return std::make_unique<linux_high_precision_timer>();
}
} // namespace platf

View File

@ -507,6 +507,22 @@ namespace platf {
return std::make_unique<qos_t>(sockfd, reset_options);
}
class macos_high_precision_timer: public high_precision_timer {
public:
void
sleep_for(const std::chrono::nanoseconds &duration) override {
std::this_thread::sleep_for(duration);
}
operator bool() override {
return true;
}
};
std::unique_ptr<high_precision_timer>
create_high_precision_timer() {
return std::make_unique<macos_high_precision_timer>();
}
} // namespace platf
namespace dyn {

View File

@ -161,9 +161,6 @@ namespace platf::dxgi {
int
init(const ::video::config_t &config, const std::string &display_name);
void
high_precision_sleep(std::chrono::nanoseconds duration);
capture_e
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override;
@ -184,7 +181,7 @@ namespace platf::dxgi {
DXGI_FORMAT capture_format;
D3D_FEATURE_LEVEL feature_level;
util::safe_ptr_v2<std::remove_pointer_t<HANDLE>, BOOL, CloseHandle> timer;
std::unique_ptr<high_precision_timer> timer = create_high_precision_timer();
typedef enum _D3DKMT_SCHEDULINGPRIORITYCLASS {
D3DKMT_SCHEDULINGPRIORITYCLASS_IDLE, ///< Idle priority class

View File

@ -182,27 +182,6 @@ namespace platf::dxgi {
release_frame();
}
void
display_base_t::high_precision_sleep(std::chrono::nanoseconds duration) {
if (!timer) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with uninitialized timer";
return;
}
if (duration < 0s) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with negative duration";
return;
}
if (duration > 5s) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with unexpectedly large duration (>5s)";
return;
}
LARGE_INTEGER due_time;
due_time.QuadPart = duration.count() / -100;
SetWaitableTimer(timer.get(), &due_time, 0, nullptr, nullptr, false);
WaitForSingleObject(timer.get(), INFINITE);
}
capture_e
display_base_t::capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) {
auto adjust_client_frame_rate = [&]() -> DXGI_RATIONAL {
@ -268,7 +247,7 @@ namespace platf::dxgi {
status = capture_e::timeout;
}
else {
high_precision_sleep(sleep_period);
timer->sleep_for(sleep_period);
std::chrono::nanoseconds overshoot_ns = std::chrono::steady_clock::now() - sleep_target;
log_sleep_overshoot(overshoot_ns);
@ -799,15 +778,9 @@ namespace platf::dxgi {
<< "Max Full Luminance : "sv << desc1.MaxFullFrameLuminance << " nits"sv;
}
// Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+)
timer.reset(CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS));
if (!timer) {
timer.reset(CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS));
if (!timer) {
auto winerr = GetLastError();
BOOST_LOG(error) << "Failed to create timer: "sv << winerr;
return -1;
}
if (!timer || !*timer) {
BOOST_LOG(error) << "Uninitialized high precision timer";
return -1;
}
return 0;

View File

@ -61,6 +61,38 @@
#define WLAN_API_MAKE_VERSION(_major, _minor) (((DWORD) (_minor)) << 16 | (_major))
#endif
#include <winternl.h>
extern "C" {
NTSTATUS NTAPI
NtSetTimerResolution(ULONG DesiredResolution, BOOLEAN SetResolution, PULONG CurrentResolution);
}
namespace {
std::atomic<bool> used_nt_set_timer_resolution = false;
bool
nt_set_timer_resolution_max() {
ULONG minimum, maximum, current;
if (!NT_SUCCESS(NtQueryTimerResolution(&minimum, &maximum, &current)) ||
!NT_SUCCESS(NtSetTimerResolution(maximum, TRUE, &current))) {
return false;
}
return true;
}
bool
nt_set_timer_resolution_min() {
ULONG minimum, maximum, current;
if (!NT_SUCCESS(NtQueryTimerResolution(&minimum, &maximum, &current)) ||
!NT_SUCCESS(NtSetTimerResolution(minimum, TRUE, &current))) {
return false;
}
return true;
}
} // namespace
namespace bp = boost::process;
using namespace std::literals;
@ -1115,8 +1147,15 @@ namespace platf {
// Enable MMCSS scheduling for DWM
DwmEnableMMCSS(true);
// Reduce timer period to 1ms
timeBeginPeriod(1);
// Reduce timer period to 0.5ms
if (nt_set_timer_resolution_max()) {
used_nt_set_timer_resolution = true;
}
else {
BOOST_LOG(error) << "NtSetTimerResolution() failed, falling back to timeBeginPeriod()";
timeBeginPeriod(1);
used_nt_set_timer_resolution = false;
}
// Promote ourselves to high priority class
SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS);
@ -1199,8 +1238,16 @@ namespace platf {
// Demote ourselves back to normal priority class
SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS);
// End our 1ms timer request
timeEndPeriod(1);
// End our 0.5ms timer request
if (used_nt_set_timer_resolution) {
used_nt_set_timer_resolution = false;
if (!nt_set_timer_resolution_min()) {
BOOST_LOG(error) << "nt_set_timer_resolution_min() failed even though nt_set_timer_resolution_max() succeeded";
}
}
else {
timeEndPeriod(1);
}
// Disable MMCSS scheduling for DWM
DwmEnableMMCSS(false);
@ -1756,4 +1803,55 @@ namespace platf {
return output;
}
class win32_high_precision_timer: public high_precision_timer {
public:
win32_high_precision_timer() {
// Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+)
timer = CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer) {
timer = CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS);
if (!timer) {
BOOST_LOG(error) << "Unable to create high_precision_timer, CreateWaitableTimerEx() failed: " << GetLastError();
}
}
}
~win32_high_precision_timer() {
if (timer) CloseHandle(timer);
}
void
sleep_for(const std::chrono::nanoseconds &duration) override {
if (!timer) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with uninitialized timer";
return;
}
if (duration < 0s) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with negative duration";
return;
}
if (duration > 5s) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with unexpectedly large duration (>5s)";
return;
}
LARGE_INTEGER due_time;
due_time.QuadPart = duration.count() / -100;
SetWaitableTimer(timer, &due_time, 0, nullptr, nullptr, false);
WaitForSingleObject(timer, INFINITE);
}
operator bool() override {
return timer != NULL;
}
private:
HANDLE timer = NULL;
};
std::unique_ptr<high_precision_timer>
create_high_precision_timer() {
return std::make_unique<win32_high_precision_timer>();
}
} // namespace platf

View File

@ -29,6 +29,8 @@ extern "C" {
#include "thread_safe.h"
#include "utility.h"
#include "platform/common.h"
#define IDX_START_A 0
#define IDX_START_B 1
#define IDX_INVALIDATE_REF_FRAMES 2
@ -656,7 +658,7 @@ namespace stream {
auto parity_shards = (data_shards * fecpercentage + 99) / 100;
// increase the FEC percentage for this frame if the parity shard minimum is not met
if (parity_shards < minparityshards) {
if (parity_shards < minparityshards && fecpercentage != 0) {
parity_shards = minparityshards;
fecpercentage = (100 * parity_shards) / data_shards;
@ -664,15 +666,6 @@ namespace stream {
}
auto nr_shards = data_shards + parity_shards;
if (nr_shards > DATA_SHARDS_MAX) {
BOOST_LOG(warning)
<< "Number of fragments for reed solomon exceeds DATA_SHARDS_MAX"sv << std::endl
<< nr_shards << " > "sv << DATA_SHARDS_MAX
<< ", skipping error correction"sv;
nr_shards = data_shards;
fecpercentage = 0;
}
util::buffer_t<char> shards { nr_shards * (blocksize + prefixsize) };
util::buffer_t<uint8_t *> shards_p { nr_shards };
@ -691,7 +684,7 @@ namespace stream {
next += copy_len;
}
if (data_shards + parity_shards <= DATA_SHARDS_MAX) {
if (fecpercentage != 0) {
// packets = parity_shards + data_shards
rs_t rs { reed_solomon_new(data_shards, parity_shards) };
@ -1255,13 +1248,29 @@ namespace stream {
platf::adjust_thread_priority(platf::thread_priority_e::high);
stat_trackers::min_max_avg_tracker<uint16_t> frame_processing_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_send_batch_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_fec_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_network_latency_tracker;
crypto::aes_t iv(12);
auto timer = platf::create_high_precision_timer();
if (!timer || !*timer) {
BOOST_LOG(error) << "Failed to create timer, aborting video broadcast thread";
return;
}
auto ratecontrol_next_frame_start = std::chrono::steady_clock::now();
while (auto packet = packets->pop()) {
if (shutdown_event->peek()) {
break;
}
auto frame_packet_start_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds fec_time = 0ns;
auto session = (session_t *) packet->channel_data;
auto lowseq = session->video.lowseq;
@ -1336,41 +1345,75 @@ namespace stream {
payload = std::string_view { (char *) payload_new.data(), payload_new.size() };
// With a fecpercentage of 255, if payload_new is broken up into more than a 100 data_shards
// it will generate greater than DATA_SHARDS_MAX shards.
// Therefore, we start breaking the data up into three separate fec blocks.
auto multi_fec_threshold = 90 * blocksize;
// There are 2 bits for FEC block count for a maximum of 4 FEC blocks
constexpr auto MAX_FEC_BLOCKS = 4;
// We can go up to 4 fec blocks, but 3 is plenty
constexpr auto MAX_FEC_BLOCKS = 3;
// The max number of data shards per block is found by solving this system of equations for D:
// D = 255 - P
// P = D * F
// which results in the solution:
// D = 255 / (1 + F)
// multiplied by 100 since F is the percentage as an integer:
// D = (255 * 100) / (100 + F)
auto max_data_shards_per_fec_block = (DATA_SHARDS_MAX * 100) / (100 + fecPercentage);
// Compute the number of FEC blocks needed for this frame using the block size and max shards
auto max_data_per_fec_block = max_data_shards_per_fec_block * blocksize;
auto fec_blocks_needed = (payload.size() + (max_data_per_fec_block - 1)) / max_data_per_fec_block;
// If the number of FEC blocks needed exceeds the protocol limit, turn off FEC for this frame.
// For normal FEC percentages, this should only happen for enormous frames (over 800 packets at 20%).
if (fec_blocks_needed > MAX_FEC_BLOCKS) {
BOOST_LOG(warning) << "Skipping FEC for abnormally large encoded frame (needed "sv << fec_blocks_needed << " FEC blocks)"sv;
fecPercentage = 0;
fec_blocks_needed = MAX_FEC_BLOCKS;
}
std::array<std::string_view, MAX_FEC_BLOCKS> fec_blocks;
decltype(fec_blocks)::iterator
fec_blocks_begin = std::begin(fec_blocks),
fec_blocks_end = std::begin(fec_blocks) + 1;
fec_blocks_end = std::begin(fec_blocks) + fec_blocks_needed;
auto lastBlockIndex = 0;
if (payload.size() > multi_fec_threshold) {
BOOST_LOG(verbose) << "Generating multiple FEC blocks"sv;
BOOST_LOG(verbose) << "Generating "sv << fec_blocks_needed << " FEC blocks"sv;
// Align individual fec blocks to blocksize
auto unaligned_size = payload.size() / MAX_FEC_BLOCKS;
auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize;
// Align individual FEC blocks to blocksize
auto unaligned_size = payload.size() / fec_blocks_needed;
auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize;
// Break the data up into 3 blocks, each containing multiple complete video packets.
fec_blocks[0] = payload.substr(0, aligned_size);
fec_blocks[1] = payload.substr(aligned_size, aligned_size);
fec_blocks[2] = payload.substr(aligned_size * 2);
lastBlockIndex = 2 << 6;
fec_blocks_end = std::end(fec_blocks);
// If we exceed the 10-bit FEC packet index (which means our frame exceeded 4096 packets),
// the frame will be unrecoverable. Log an error for this case.
if (aligned_size / blocksize >= 1024) {
BOOST_LOG(error) << "Encoder produced a frame too large to send! Is the encoder broken? (needed "sv << (aligned_size / blocksize) << " packets)"sv;
}
else {
BOOST_LOG(verbose) << "Generating single FEC block"sv;
fec_blocks[0] = payload;
// Split the data into aligned FEC blocks
for (int x = 0; x < fec_blocks_needed; ++x) {
if (x == fec_blocks_needed - 1) {
// The last block must extend to the end of the payload
fec_blocks[x] = payload.substr(x * aligned_size);
}
else {
// Earlier blocks just extend to the next block offset
fec_blocks[x] = payload.substr(x * aligned_size, aligned_size);
}
}
try {
// Use around 80% of 1Gbps 1Gbps percent ms packet byte
size_t ratecontrol_packets_in_1ms = std::giga::num * 80 / 100 / 1000 / blocksize / 8;
// Send less than 64K in a single batch.
// On Windows, batches above 64K seem to bypass SO_SNDBUF regardless of its size,
// appear in "Other I/O" and begin waiting for interrupts.
// This gives inconsistent performance so we'd rather avoid it.
size_t send_batch_size = 64 * 1024 / blocksize;
// Don't ignore the last ratecontrol group of the previous frame
auto ratecontrol_frame_start = std::max(ratecontrol_next_frame_start, std::chrono::steady_clock::now());
size_t ratecontrol_frame_packets_sent = 0;
size_t ratecontrol_group_packets_sent = 0;
auto blockIndex = 0;
std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view &current_payload) {
auto packets = (current_payload.size() + (blocksize - 1)) / blocksize;
@ -1383,7 +1426,7 @@ namespace stream {
// Match multiFecFlags with Moonlight
inspect->packet.multiFecFlags = 0x10;
inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex;
inspect->packet.multiFecBlocks = (blockIndex << 4) | ((fec_blocks_needed - 1) << 6);
if (x == 0) {
inspect->packet.flags |= FLAG_SOF;
@ -1394,10 +1437,27 @@ namespace stream {
}
}
auto fec_start = std::chrono::steady_clock::now();
// If video encryption is enabled, we allocate space for the encryption header before each shard
auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets,
session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0);
fec_time += std::chrono::steady_clock::now() - fec_start;
auto peer_address = session->video.peer.address();
auto batch_info = platf::batched_send_info_t {
nullptr,
shards.prefixsize + shards.blocksize,
0,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};
size_t next_shard_to_send = 0;
// set FEC info now that we know for sure what our percentage will be for this frame
for (auto x = 0; x < shards.size(); ++x) {
auto *inspect = (video_packet_raw_t *) shards.data(x);
@ -1415,7 +1475,7 @@ namespace stream {
inspect->rtp.sequenceNumber = util::endian::big<uint16_t>(lowseq + x);
inspect->rtp.timestamp = util::endian::big<uint32_t>(timestamp);
inspect->packet.multiFecBlocks = (blockIndex << 4) | lastBlockIndex;
inspect->packet.multiFecBlocks = (blockIndex << 4) | ((fec_blocks_needed - 1) << 6);
inspect->packet.frameIndex = packet->frame_index();
// Encrypt this shard if video encryption is enabled
@ -1438,35 +1498,87 @@ namespace stream {
std::copy(std::begin(iv), std::end(iv), prefix->iv);
session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, prefix->tag, &iv);
}
if (x - next_shard_to_send + 1 >= send_batch_size ||
x + 1 == shards.size()) {
// Do pacing within the frame.
// Also trigger pacing before the first send_batch() of the frame
// to account for the last send_batch() of the previous frame.
if (ratecontrol_group_packets_sent >= ratecontrol_packets_in_1ms ||
ratecontrol_frame_packets_sent == 0) {
auto due = ratecontrol_frame_start +
std::chrono::duration_cast<std::chrono::nanoseconds>(1ms) *
ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms;
auto now = std::chrono::steady_clock::now();
if (now < due) {
timer->sleep_for(due - now);
}
ratecontrol_group_packets_sent = 0;
}
size_t current_batch_size = x - next_shard_to_send + 1;
batch_info.buffer = shards.prefix(next_shard_to_send);
batch_info.block_count = current_batch_size;
auto batch_start_time = std::chrono::steady_clock::now();
// Use a batched send if it's supported on this platform
if (!platf::send_batch(batch_info)) {
// Batched send is not available, so send each packet individually
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto y = 0; y < current_batch_size; y++) {
auto send_info = platf::send_info_t {
shards.prefix(next_shard_to_send + y),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};
platf::send(send_info);
}
}
if (config::sunshine.min_log_level <= 1) {
// Print send_batch() latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: individual send_batch() latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double send_batch_latency = (std::chrono::steady_clock::now() - batch_start_time).count() / 1000000.;
frame_send_batch_latency_tracker.collect_and_callback_on_interval(send_batch_latency, print_info, 20s);
}
ratecontrol_group_packets_sent += current_batch_size;
ratecontrol_frame_packets_sent += current_batch_size;
next_shard_to_send = x + 1;
}
}
auto peer_address = session->video.peer.address();
auto batch_info = platf::batched_send_info_t {
shards.shards.begin(),
shards.prefixsize + shards.blocksize,
shards.nr_shards,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};
// remember this in case the next frame comes immediately
ratecontrol_next_frame_start = ratecontrol_frame_start +
std::chrono::duration_cast<std::chrono::nanoseconds>(1ms) *
ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms;
// Use a batched send if it's supported on this platform
if (!platf::send_batch(batch_info)) {
// Batched send is not available, so send each packet individually
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto x = 0; x < shards.size(); ++x) {
auto send_info = platf::send_info_t {
shards.prefix(x),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};
if (config::sunshine.min_log_level <= 1) {
// Print frame FEC latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame FEC latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double fec_latency = fec_time.count() / 1000000.;
frame_fec_latency_tracker.collect_and_callback_on_interval(fec_latency, print_info, 20s);
}
platf::send(send_info);
}
if (config::sunshine.min_log_level <= 1) {
// Print frame network latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame complete network latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double network_latency = (std::chrono::steady_clock::now() - frame_packet_start_time).count() / 1000000.;
frame_network_latency_tracker.collect_and_callback_on_interval(network_latency, print_info, 20s);
}
if (packet->is_idr()) {
@ -1618,6 +1730,14 @@ namespace stream {
return -1;
}
// Set video socket send buffer size (SO_SENDBUF) to 1MB
try {
ctx.video_sock.set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
}
catch (...) {
BOOST_LOG(error) << "Failed to set video socket send buffer size (SO_SENDBUF)";
}
ctx.video_sock.bind(udp::endpoint(protocol, video_port), ec);
if (ec) {
BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << video_port << "]: "sv << ec.message();

View File

@ -606,7 +606,8 @@ namespace util {
return _deleter;
}
explicit operator bool() const {
explicit
operator bool() const {
return _p != nullptr;
}