Refactor periodic loggers and limit batch size for small packets (#2835)

* Refactor periodic loggers

* Limit network batch size also by packet count

Previously it was limited only by size, and exceeding 64 packets in a
single batch is asking for problems.
This commit is contained in:
ns6089 2024-07-13 23:55:03 +03:00 committed by GitHub
parent 8187a28afc
commit 18e7dfb190
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 186 additions and 110 deletions

View File

@ -17,6 +17,9 @@ extern boost::log::sources::severity_logger<int> warning;
extern boost::log::sources::severity_logger<int> error;
extern boost::log::sources::severity_logger<int> fatal;
#include "config.h"
#include "stat_trackers.h"
/**
* @brief Handles the initialization and deinitialization of the logging system.
*/
@ -75,4 +78,130 @@ namespace logging {
*/
void
print_help(const char *name);
/**
* @brief A helper class for tracking and logging numerical values across a period of time
* @examples
* min_max_avg_periodic_logger<int> logger(debug, "Test time value", "ms", 5s);
* logger.collect_and_log(1);
* // ...
* logger.collect_and_log(2);
* // after 5 seconds
* logger.collect_and_log(3);
* // In the log:
* // [2024:01:01:12:00:00]: Debug: Test time value (min/max/avg): 1ms/3ms/2.00ms
* @examples_end
*/
template <typename T>
class min_max_avg_periodic_logger {
public:
min_max_avg_periodic_logger(boost::log::sources::severity_logger<int> &severity,
std::string_view message,
std::string_view units,
std::chrono::seconds interval_in_seconds = std::chrono::seconds(20)):
severity(severity),
message(message),
units(units),
interval(interval_in_seconds),
enabled(config::sunshine.min_log_level <= severity.default_severity()) {}
void
collect_and_log(const T &value) {
if (enabled) {
auto print_info = [&](const T &min_value, const T &max_value, double avg_value) {
auto f = stat_trackers::two_digits_after_decimal();
if constexpr (std::is_floating_point_v<T>) {
BOOST_LOG(severity.get()) << message << " (min/max/avg): " << f % min_value << units << "/" << f % max_value << units << "/" << f % avg_value << units;
}
else {
BOOST_LOG(severity.get()) << message << " (min/max/avg): " << min_value << units << "/" << max_value << units << "/" << f % avg_value << units;
}
};
tracker.collect_and_callback_on_interval(value, print_info, interval);
}
}
void
collect_and_log(std::function<T()> func) {
if (enabled) collect_and_log(func());
}
void
reset() {
if (enabled) tracker.reset();
}
bool
is_enabled() const {
return enabled;
}
private:
std::reference_wrapper<boost::log::sources::severity_logger<int>> severity;
std::string message;
std::string units;
std::chrono::seconds interval;
bool enabled;
stat_trackers::min_max_avg_tracker<T> tracker;
};
/**
* @brief A helper class for tracking and logging short time intervals across a period of time
* @examples
* time_delta_periodic_logger logger(debug, "Test duration", 5s);
* logger.first_point_now();
* // ...
* logger.second_point_now_and_log();
* // after 5 seconds
* logger.first_point_now();
* // ...
* logger.second_point_now_and_log();
* // In the log:
* // [2024:01:01:12:00:00]: Debug: Test duration (min/max/avg): 1.23ms/3.21ms/2.31ms
* @examples_end
*/
class time_delta_periodic_logger {
public:
time_delta_periodic_logger(boost::log::sources::severity_logger<int> &severity,
std::string_view message,
std::chrono::seconds interval_in_seconds = std::chrono::seconds(20)):
logger(severity, message, "ms", interval_in_seconds) {}
void
first_point(const std::chrono::steady_clock::time_point &point) {
if (logger.is_enabled()) point1 = point;
}
void
first_point_now() {
if (logger.is_enabled()) first_point(std::chrono::steady_clock::now());
}
void
second_point_and_log(const std::chrono::steady_clock::time_point &point) {
if (logger.is_enabled()) {
logger.collect_and_log(std::chrono::duration<double, std::milli>(point - point1).count());
}
}
void
second_point_now_and_log() {
if (logger.is_enabled()) second_point_and_log(std::chrono::steady_clock::now());
}
void
reset() {
if (logger.is_enabled()) logger.reset();
}
bool
is_enabled() const {
return logger.is_enabled();
}
private:
std::chrono::steady_clock::time_point point1 = std::chrono::steady_clock::now();
min_max_avg_periodic_logger<double> logger;
};
} // namespace logging

View File

@ -383,7 +383,7 @@ namespace nvenc {
}
{
auto f = stat_trackers::one_digit_after_decimal();
auto f = stat_trackers::two_digits_after_decimal();
BOOST_LOG(debug) << "NvEnc: requested encoded frame size " << f % (client_config.bitrate / 8. / client_config.framerate) << " kB";
}
@ -501,15 +501,7 @@ namespace nvenc {
BOOST_LOG(error) << "NvEncUnlockBitstream failed: " << last_error_string;
}
if (config::sunshine.min_log_level <= 1) {
// Print encoded frame size stats to debug log every 20 seconds
auto callback = [&](float stat_min, float stat_max, double stat_avg) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "NvEnc: encoded frame sizes (min max avg) " << f % stat_min << " " << f % stat_max << " " << f % stat_avg << " kB";
};
using namespace std::literals;
encoder_state.frame_size_tracker.collect_and_callback_on_interval(encoded_frame.data.size() / 1000., callback, 20s);
}
encoder_state.frame_size_logger.collect_and_log(encoded_frame.data.size() / 1000.);
return encoded_frame;
}

View File

@ -8,7 +8,7 @@
#include "nvenc_config.h"
#include "nvenc_encoded_frame.h"
#include "src/stat_trackers.h"
#include "src/logging.h"
#include "src/video.h"
#include <ffnvcodec/nvEncodeAPI.h>
@ -89,7 +89,7 @@ namespace nvenc {
uint64_t last_encoded_frame_index = 0;
bool rfi_needs_confirmation = false;
std::pair<uint64_t, uint64_t> last_rfi_range;
stat_trackers::min_max_avg_tracker<float> frame_size_tracker;
logging::min_max_avg_periodic_logger<double> frame_size_logger = { debug, "NvEnc: encoded frame sizes in kB", "" };
} encoder_state;
};

View File

@ -14,7 +14,6 @@
#include "src/config.h"
#include "src/logging.h"
#include "src/stat_trackers.h"
#include "src/thread_safe.h"
#include "src/utility.h"
#include "src/video_colorspace.h"
@ -520,19 +519,7 @@ namespace platf {
protected:
// collect capture timing data (at loglevel debug)
stat_trackers::min_max_avg_tracker<double> sleep_overshoot_tracker;
void
log_sleep_overshoot(std::chrono::nanoseconds overshoot_ns) {
if (config::sunshine.min_log_level <= 1) {
// Print sleep overshoot stats to debug log every 20 seconds
auto print_info = [&](double min_overshoot, double max_overshoot, double avg_overshoot) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Sleep overshoot (min/max/avg): " << f % min_overshoot << "ms/" << f % max_overshoot << "ms/" << f % avg_overshoot << "ms";
};
// std::chrono::nanoseconds overshoot_ns = std::chrono::steady_clock::now() - next_frame;
sleep_overshoot_tracker.collect_and_callback_on_interval(overshoot_ns.count() / 1000000., print_info, 20s);
}
}
logging::time_delta_periodic_logger sleep_overshoot_logger = { debug, "Frame capture sleep overshoot" };
};
class mic_t {

View File

@ -806,16 +806,15 @@ namespace cuda {
handle.reset();
});
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up

View File

@ -1193,17 +1193,16 @@ namespace platf {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up
@ -1417,17 +1416,16 @@ namespace platf {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up

View File

@ -129,17 +129,16 @@ namespace wl {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up
@ -265,17 +264,16 @@ namespace wl {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up

View File

@ -481,17 +481,16 @@ namespace platf {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up
@ -627,17 +626,16 @@ namespace platf {
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override {
auto next_frame = std::chrono::steady_clock::now();
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
auto now = std::chrono::steady_clock::now();
if (next_frame > now) {
std::this_thread::sleep_for(next_frame - now);
sleep_overshoot_logger.first_point(next_frame);
sleep_overshoot_logger.second_point_now_and_log();
}
now = std::chrono::steady_clock::now();
std::chrono::nanoseconds overshoot_ns = now - next_frame;
log_sleep_overshoot(overshoot_ns);
next_frame += delay;
if (next_frame < now) { // some major slowdown happened; we couldn't keep up

View File

@ -17,7 +17,6 @@ typedef long NTSTATUS;
#include "src/config.h"
#include "src/logging.h"
#include "src/platform/common.h"
#include "src/stat_trackers.h"
#include "src/video.h"
namespace platf {
@ -218,7 +217,7 @@ namespace platf::dxgi {
SetThreadExecutionState(ES_CONTINUOUS);
});
sleep_overshoot_tracker.reset();
sleep_overshoot_logger.reset();
while (true) {
// This will return false if the HDR state changes or for any number of other
@ -248,8 +247,8 @@ namespace platf::dxgi {
}
else {
timer->sleep_for(sleep_period);
std::chrono::nanoseconds overshoot_ns = std::chrono::steady_clock::now() - sleep_target;
log_sleep_overshoot(overshoot_ns);
sleep_overshoot_logger.first_point(sleep_target);
sleep_overshoot_logger.second_point_now_and_log();
status = snapshot(pull_free_image_cb, img_out, 0ms, *cursor);

View File

@ -11,4 +11,9 @@ namespace stat_trackers {
return boost::format("%1$.1f");
}
boost::format
two_digits_after_decimal() {
return boost::format("%1$.2f");
}
} // namespace stat_trackers

View File

@ -15,6 +15,9 @@ namespace stat_trackers {
boost::format
one_digit_after_decimal();
boost::format
two_digits_after_decimal();
template <typename T>
class min_max_avg_tracker {
public:
@ -22,7 +25,10 @@ namespace stat_trackers {
void
collect_and_callback_on_interval(T stat, const callback_function &callback, std::chrono::seconds interval_in_seconds) {
if (std::chrono::steady_clock::now() > data.last_callback_time + interval_in_seconds) {
if (data.calls == 0) {
data.last_callback_time = std::chrono::steady_clock::now();
}
else if (std::chrono::steady_clock::now() > data.last_callback_time + interval_in_seconds) {
callback(data.stat_min, data.stat_max, data.stat_total / data.calls);
data = {};
}
@ -39,7 +45,7 @@ namespace stat_trackers {
private:
struct {
std::chrono::steady_clock::steady_clock::time_point last_callback_time = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point last_callback_time = std::chrono::steady_clock::now();
T stat_min = std::numeric_limits<T>::max();
T stat_max = std::numeric_limits<T>::min();
double stat_total = 0;

View File

@ -24,7 +24,6 @@ extern "C" {
#include "input.h"
#include "logging.h"
#include "network.h"
#include "stat_trackers.h"
#include "stream.h"
#include "sync.h"
#include "system_tray.h"
@ -1248,11 +1247,11 @@ namespace stream {
// Video traffic is sent on this thread
platf::adjust_thread_priority(platf::thread_priority_e::high);
stat_trackers::min_max_avg_tracker<uint16_t> frame_processing_latency_tracker;
logging::min_max_avg_periodic_logger<double> frame_processing_latency_logger(debug, "Frame processing latency", "ms");
stat_trackers::min_max_avg_tracker<double> frame_send_batch_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_fec_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_network_latency_tracker;
logging::time_delta_periodic_logger frame_send_batch_latency_logger(debug, "Network: each send_batch() latency");
logging::time_delta_periodic_logger frame_fec_latency_logger(debug, "Network: each FEC block latency");
logging::time_delta_periodic_logger frame_network_latency_logger(debug, "Network: frame's overall network latency");
crypto::aes_t iv(12);
@ -1269,8 +1268,7 @@ namespace stream {
break;
}
auto frame_packet_start_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds fec_time = 0ns;
frame_network_latency_logger.first_point_now();
auto session = (session_t *) packet->channel_data;
auto lowseq = session->video.lowseq;
@ -1309,17 +1307,8 @@ namespace stream {
};
uint16_t latency = duration_to_latency(std::chrono::steady_clock::now() - *packet->frame_timestamp);
if (config::sunshine.min_log_level <= 1) {
// Print frame processing latency stats to debug log every 20 seconds
auto print_info = [&](uint16_t min_latency, uint16_t max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Frame processing latency (min/max/avg): " << f % (min_latency / 10.) << "ms/" << f % (max_latency / 10.) << "ms/" << f % (avg_latency / 10.) << "ms";
};
frame_processing_latency_tracker.collect_and_callback_on_interval(latency, print_info, 20s);
}
frame_header.frame_processing_latency = latency;
frame_processing_latency_logger.collect_and_log(latency / 10.);
}
else {
frame_header.frame_processing_latency = 0;
@ -1408,6 +1397,10 @@ namespace stream {
// appear in "Other I/O" and begin waiting for interrupts.
// This gives inconsistent performance so we'd rather avoid it.
size_t send_batch_size = 64 * 1024 / blocksize;
// Also don't exceed 64 packets, which can happen when Moonlight requests
// unusually small packet size.
// Generic Segmentation Offload on Linux can't do more than 64.
send_batch_size = std::min<size_t>(64, send_batch_size);
// Don't ignore the last ratecontrol group of the previous frame
auto ratecontrol_frame_start = std::max(ratecontrol_next_frame_start, std::chrono::steady_clock::now());
@ -1438,13 +1431,11 @@ namespace stream {
}
}
auto fec_start = std::chrono::steady_clock::now();
frame_fec_latency_logger.first_point_now();
// If video encryption is enabled, we allocate space for the encryption header before each shard
auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets,
session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0);
fec_time += std::chrono::steady_clock::now() - fec_start;
frame_fec_latency_logger.second_point_now_and_log();
auto peer_address = session->video.peer.address();
auto batch_info = platf::batched_send_info_t {
@ -1523,7 +1514,7 @@ namespace stream {
batch_info.buffer = shards.prefix(next_shard_to_send);
batch_info.block_count = current_batch_size;
auto batch_start_time = std::chrono::steady_clock::now();
frame_send_batch_latency_logger.first_point_now();
// Use a batched send if it's supported on this platform
if (!platf::send_batch(batch_info)) {
// Batched send is not available, so send each packet individually
@ -1541,15 +1532,7 @@ namespace stream {
platf::send(send_info);
}
}
if (config::sunshine.min_log_level <= 1) {
// Print send_batch() latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: individual send_batch() latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double send_batch_latency = (std::chrono::steady_clock::now() - batch_start_time).count() / 1000000.;
frame_send_batch_latency_tracker.collect_and_callback_on_interval(send_batch_latency, print_info, 20s);
}
frame_send_batch_latency_logger.second_point_now_and_log();
ratecontrol_group_packets_sent += current_batch_size;
ratecontrol_frame_packets_sent += current_batch_size;
@ -1562,25 +1545,7 @@ namespace stream {
std::chrono::duration_cast<std::chrono::nanoseconds>(1ms) *
ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms;
if (config::sunshine.min_log_level <= 1) {
// Print frame FEC latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame FEC latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double fec_latency = fec_time.count() / 1000000.;
frame_fec_latency_tracker.collect_and_callback_on_interval(fec_latency, print_info, 20s);
}
if (config::sunshine.min_log_level <= 1) {
// Print frame network latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame complete network latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double network_latency = (std::chrono::steady_clock::now() - frame_packet_start_time).count() / 1000000.;
frame_network_latency_tracker.collect_and_callback_on_interval(network_latency, print_info, 20s);
}
frame_network_latency_logger.second_point_now_and_log();
if (packet->is_idr()) {
BOOST_LOG(verbose) << "Key Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv;