Log: minor improvements

This commit is contained in:
Nekotekina 2017-11-21 21:45:02 +03:00
parent 74c248150b
commit b60d3a3dae

View File

@ -69,10 +69,17 @@ namespace logs
::HANDLE m_fmap;
#endif
uchar* m_fptr{};
z_stream m_zs{};
semaphore<> m_m;
alignas(128) atomic_t<u64> m_buf{0}; // MSB (40 bit): push begin, LSB (24 bis): push size
alignas(128) atomic_t<u64> m_out{0}; // Amount of bytes written to file
uchar m_zout[65536];
// Write buffered logs immediately
bool flush(u64 bufv);
public:
file_writer(const std::string& name);
@ -355,7 +362,10 @@ logs::file_writer::file_writer(const std::string& name)
m_fout.open(log_name, fs::rewrite);
// Compressed log
m_fout2.open(log_name + ".gz", fs::rewrite);
if (!m_fout2.open(log_name + ".gz", fs::rewrite) || deflateInit2(&m_zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) != Z_OK)
{
m_fout2.close();
}
}
catch (...)
{
@ -366,15 +376,6 @@ logs::file_writer::file_writer(const std::string& name)
{
thread_ctrl::set_native_priority(-1);
z_stream zs{};
uchar out[65536];
bool zs_init = true;
if (!m_fout2 || deflateInit2(&zs, 9, Z_DEFLATED, 16 + 15, 9, Z_DEFAULT_STRATEGY) != Z_OK)
{
zs_init = false;
m_fout2.close();
}
while (true)
{
const u64 bufv = m_buf;
@ -386,42 +387,9 @@ logs::file_writer::file_writer(const std::string& name)
continue;
}
const u64 st = +m_out;
const u64 end = std::min<u64>((st + s_log_size) & ~(s_log_size - 1), bufv >> 24);
if (end > st)
if (!flush(bufv))
{
const u64 size = end - st;
if (m_fout && m_fout.write(m_fptr + st % s_log_size, size) != size)
{
m_fout.close();
}
if (m_fout2)
{
zs.avail_in = size;
zs.next_in = m_fptr + st % s_log_size;
do
{
zs.avail_out = sizeof(out);
zs.next_out = out;
if (deflate(&zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_fout2.write(out, sizeof(out) - zs.avail_out) != sizeof(out) - zs.avail_out)
{
m_fout2.close();
break;
}
}
while (zs.avail_out == 0);
}
m_out += end - st;
}
else
{
if (st == -1)
if (m_out == -1)
{
break;
}
@ -429,29 +397,6 @@ logs::file_writer::file_writer(const std::string& name)
std::this_thread::sleep_for(10ms);
}
}
if (zs_init)
{
if (m_fout2)
{
zs.avail_in = 0;
zs.next_in = nullptr;
do
{
zs.avail_out = sizeof(out);
zs.next_out = out;
if (deflate(&zs, Z_FINISH) == Z_STREAM_ERROR || m_fout2.write(out, sizeof(out) - zs.avail_out) != sizeof(out) - zs.avail_out)
{
break;
}
}
while (zs.avail_out == 0);
}
deflateEnd(&zs);
}
});
}
@ -466,6 +411,26 @@ logs::file_writer::~file_writer()
m_out = -1;
m_writer.join();
if (m_fout2)
{
m_zs.avail_in = 0;
m_zs.next_in = nullptr;
do
{
m_zs.avail_out = sizeof(m_zout);
m_zs.next_out = m_zout;
if (deflate(&m_zs, Z_FINISH) == Z_STREAM_ERROR || m_fout2.write(m_zout, sizeof(m_zout) - m_zs.avail_out) != sizeof(m_zout) - m_zs.avail_out)
{
break;
}
}
while (m_zs.avail_out == 0);
deflateEnd(&m_zs);
}
#ifdef _WIN32
UnmapViewOfFile(m_fptr);
CloseHandle(m_fmap);
@ -474,17 +439,67 @@ logs::file_writer::~file_writer()
#endif
}
bool logs::file_writer::flush(u64 bufv)
{
semaphore_lock lock(m_m);
const u64 st = +m_out;
const u64 end = std::min<u64>((st + s_log_size) & ~(s_log_size - 1), bufv >> 24);
if (end > st)
{
// Avoid writing too big fragments
const u64 size = std::min<u64>(end - st, sizeof(m_zout) / 2);
// Write uncompressed
if (m_fout && m_fout.write(m_fptr + st % s_log_size, size) != size)
{
m_fout.close();
}
// Write compressed
if (m_fout2)
{
m_zs.avail_in = size;
m_zs.next_in = m_fptr + st % s_log_size;
do
{
m_zs.avail_out = sizeof(m_zout);
m_zs.next_out = m_zout;
if (deflate(&m_zs, Z_NO_FLUSH) == Z_STREAM_ERROR || m_fout2.write(m_zout, sizeof(m_zout) - m_zs.avail_out) != sizeof(m_zout) - m_zs.avail_out)
{
deflateEnd(&m_zs);
m_fout2.close();
break;
}
}
while (m_zs.avail_out == 0);
}
m_out += size;
return true;
}
return false;
}
void logs::file_writer::log(logs::level sev, const char* text, std::size_t size)
{
while (true)
// TODO: write bigger fragment directly in blocking manner
while (size && size <= 0xffffff)
{
u64 bufv;
const auto pos = m_buf.atomic_op([&](u64& v) -> uchar*
{
const u64 v1 = v >> 24;
const u64 v2 = v & 0xffffff;
if (v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size)
if (UNLIKELY(v2 + size > 0xffffff || v1 + v2 + size >= m_out + s_log_size))
{
bufv = v;
return nullptr;
}
@ -494,7 +509,16 @@ void logs::file_writer::log(logs::level sev, const char* text, std::size_t size)
if (UNLIKELY(!pos))
{
std::this_thread::yield();
if ((bufv & 0xffffff) + size > 0xffffff || bufv & 0xffffff)
{
// Concurrency limit reached
std::this_thread::yield();
}
else
{
// Queue is full, need to write out
flush(bufv);
}
continue;
}