From b60d3a3dae20cc7609f214e139e1ff9ff0f11f69 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Tue, 21 Nov 2017 21:45:02 +0300 Subject: [PATCH] Log: minor improvements --- Utilities/Log.cpp | 166 ++++++++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 71 deletions(-) diff --git a/Utilities/Log.cpp b/Utilities/Log.cpp index 3f7d283493..21e52a69dc 100644 --- a/Utilities/Log.cpp +++ b/Utilities/Log.cpp @@ -69,10 +69,17 @@ namespace logs ::HANDLE m_fmap; #endif uchar* m_fptr{}; + z_stream m_zs{}; + semaphore<> m_m; alignas(128) atomic_t m_buf{0}; // MSB (40 bit): push begin, LSB (24 bis): push size alignas(128) atomic_t m_out{0}; // Amount of bytes written to file + uchar m_zout[65536]; + + // Write buffered logs immediately + bool flush(u64 bufv); + public: file_writer(const std::string& name); @@ -355,7 +362,10 @@ logs::file_writer::file_writer(const std::string& name) m_fout.open(log_name, fs::rewrite); // Compressed log - m_fout2.open(log_name + ".gz", fs::rewrite); + if (!m_fout2.open(log_name + ".gz", fs::rewrite) || deflateInit2(&m_zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) != Z_OK) + { + m_fout2.close(); + } } catch (...) { @@ -366,15 +376,6 @@ logs::file_writer::file_writer(const std::string& name) { thread_ctrl::set_native_priority(-1); - z_stream zs{}; - uchar out[65536]; - bool zs_init = true; - if (!m_fout2 || deflateInit2(&zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) != Z_OK) - { - zs_init = false; - m_fout2.close(); - } - while (true) { const u64 bufv = m_buf; @@ -386,42 +387,9 @@ logs::file_writer::file_writer(const std::string& name) continue; } - const u64 st = +m_out; - const u64 end = std::min((st + s_log_size) & ~(s_log_size - 1), bufv >> 24); - - if (end > st) + if (!flush(bufv)) { - const u64 size = end - st; - - if (m_fout && m_fout.write(m_fptr + st % s_log_size, size) != size) - { - m_fout.close(); - } - - if (m_fout2) - { - zs.avail_in = size; - zs.next_in = m_fptr + st % s_log_size; - - do - { - zs.avail_out = sizeof(out); - zs.next_out = out; - - if (deflate(&zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_fout2.write(out, sizeof(out) - zs.avail_out) != sizeof(out) - zs.avail_out) - { - m_fout2.close(); - break; - } - } - while (zs.avail_out == 0); - } - - m_out += end - st; - } - else - { - if (st == -1) + if (m_out == -1) { break; } @@ -429,29 +397,6 @@ logs::file_writer::file_writer(const std::string& name) std::this_thread::sleep_for(10ms); } } - - if (zs_init) - { - if (m_fout2) - { - zs.avail_in = 0; - zs.next_in = nullptr; - - do - { - zs.avail_out = sizeof(out); - zs.next_out = out; - - if (deflate(&zs, Z_FINISH) == Z_STREAM_ERROR || m_fout2.write(out, sizeof(out) - zs.avail_out) != sizeof(out) - zs.avail_out) - { - break; - } - } - while (zs.avail_out == 0); - } - - deflateEnd(&zs); - } }); } @@ -466,6 +411,26 @@ logs::file_writer::~file_writer() m_out = -1; m_writer.join(); + if (m_fout2) + { + m_zs.avail_in = 0; + m_zs.next_in = nullptr; + + do + { + m_zs.avail_out = sizeof(m_zout); + m_zs.next_out = m_zout; + + if (deflate(&m_zs, Z_FINISH) == Z_STREAM_ERROR || m_fout2.write(m_zout, sizeof(m_zout) - m_zs.avail_out) != sizeof(m_zout) - m_zs.avail_out) + { + break; + } + } + while (m_zs.avail_out == 0); + + deflateEnd(&m_zs); + } + #ifdef _WIN32 UnmapViewOfFile(m_fptr); CloseHandle(m_fmap); @@ -474,17 +439,67 @@ logs::file_writer::~file_writer() #endif } +bool logs::file_writer::flush(u64 bufv) +{ + semaphore_lock lock(m_m); + + const u64 st = +m_out; + const u64 end = std::min((st + s_log_size) & ~(s_log_size - 1), bufv >> 24); + + if (end > st) + { + // Avoid writing too big fragments + const u64 size = std::min(end - st, sizeof(m_zout) / 2); + + // Write uncompressed + if (m_fout && m_fout.write(m_fptr + st % s_log_size, size) != size) + { + m_fout.close(); + } + + // Write compressed + if (m_fout2) + { + m_zs.avail_in = size; + m_zs.next_in = m_fptr + st % s_log_size; + + do + { + m_zs.avail_out = sizeof(m_zout); + m_zs.next_out = m_zout; + + if (deflate(&m_zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_fout2.write(m_zout, sizeof(m_zout) - m_zs.avail_out) != sizeof(m_zout) - m_zs.avail_out) + { + deflateEnd(&m_zs); + m_fout2.close(); + break; + } + } + while (m_zs.avail_out == 0); + } + + m_out += size; + return true; + } + + return false; +} + void logs::file_writer::log(logs::level sev, const char* text, std::size_t size) { - while (true) + // TODO: write bigger fragment directly in blocking manner + while (size && size <= 0xffffff) { + u64 bufv; + const auto pos = m_buf.atomic_op([&](u64& v) -> uchar* { const u64 v1 = v >> 24; const u64 v2 = v & 0xffffff; - if (v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size) + if (UNLIKELY(v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size)) { + bufv = v; return nullptr; } @@ -494,7 +509,16 @@ void logs::file_writer::log(logs::level sev, const char* text, std::size_t size) if (UNLIKELY(!pos)) { - std::this_thread::yield(); + if ((bufv & 0xffffff) + size > 0xffffff || bufv & 0xffffff) + { + // Concurrency limit reached + std::this_thread::yield(); + } + else + { + // Queue is full, need to write out + flush(bufv); + } continue; }