Use NBIO for RPCN

This commit is contained in:
RipleyTom 2023-03-07 10:36:28 +01:00 committed by Megamouse
parent 65eb625843
commit f7a3d45c2c
2 changed files with 72 additions and 25 deletions

View File

@ -45,6 +45,8 @@
LOG_CHANNEL(rpcn_log, "rpcn"); LOG_CHANNEL(rpcn_log, "rpcn");
int get_native_error();
namespace rpcn namespace rpcn
{ {
localized_string_id rpcn_state_to_localized_string_id(rpcn::rpcn_state state) localized_string_id rpcn_state_to_localized_string_id(rpcn::rpcn_state state)
@ -498,13 +500,36 @@ namespace rpcn
if (n_recv == 0) if (n_recv == 0)
return recvn_result::recvn_nodata; return recvn_result::recvn_nodata;
pollfd poll_fd{};
while ((poll_fd.revents & POLLIN) != POLLIN && (poll_fd.revents & POLLRDNORM) != POLLRDNORM)
{
if (terminate)
return recvn_result::recvn_terminate;
poll_fd.fd = sockfd;
poll_fd.events = POLLIN;
poll_fd.revents = 0;
#ifdef _WIN32
int res_poll = WSAPoll(&poll_fd, 1, RPCN_TIMEOUT_INTERVAL);
#else
int res_poll = poll(&poll_fd, 1, RPCN_TIMEOUT_INTERVAL);
#endif
if (res_poll < 0)
{
rpcn_log.error("recvn poll failed with native error: %d)", get_native_error());
return recvn_result::recvn_fatal;
}
num_timeouts++; num_timeouts++;
if (num_timeouts >= 50) if (num_timeouts > (RPCN_TIMEOUT / RPCN_TIMEOUT_INTERVAL))
{ {
rpcn_log.error("recvn timeout with %d bytes received", n_recv); rpcn_log.error("recvn timeout with %d bytes received", n_recv);
return recvn_result::recvn_timeout; return recvn_result::recvn_timeout;
} }
} }
}
else else
{ {
if (res == 0) if (res == 0)
@ -514,7 +539,7 @@ namespace rpcn
return recvn_result::recvn_noconn; return recvn_result::recvn_noconn;
} }
rpcn_log.error("recvn failed with error: %d:%s", res, get_wolfssl_error(read_wssl, res)); rpcn_log.error("recvn failed with error: %d:%s(native: %d)", res, get_wolfssl_error(read_wssl, res), get_native_error());
return recvn_result::recvn_fatal; return recvn_result::recvn_fatal;
} }
@ -532,21 +557,46 @@ namespace rpcn
usz n_sent = 0; usz n_sent = 0;
while (n_sent != packet.size()) while (n_sent != packet.size())
{ {
if (terminate)
return error_and_disconnect("send_packet was forcefully aborted");
if (!connected) if (!connected)
return false; return false;
int res = wolfSSL_write(write_wssl, reinterpret_cast<const char*>(packet.data()), packet.size()); int res = wolfSSL_write(write_wssl, reinterpret_cast<const char*>(packet.data() + n_sent), packet.size() - n_sent);
if (res <= 0) if (res <= 0)
{ {
if (wolfSSL_want_write(write_wssl)) if (wolfSSL_want_write(write_wssl))
{ {
pollfd poll_fd{};
while ((poll_fd.revents & POLLOUT) != POLLOUT)
{
if (terminate)
return error_and_disconnect("send_packet was forcefully aborted");
poll_fd.fd = sockfd;
poll_fd.events = POLLOUT;
poll_fd.revents = 0;
#ifdef _WIN32
int res_poll = WSAPoll(&poll_fd, 1, RPCN_TIMEOUT_INTERVAL);
#else
int res_poll = poll(&poll_fd, 1, RPCN_TIMEOUT_INTERVAL);
#endif
if (res_poll < 0)
{
rpcn_log.error("send_packet failed with native error: %d)", get_native_error());
return error_and_disconnect("send_packet failed on poll");
}
num_timeouts++; num_timeouts++;
if (num_timeouts >= 50) if (num_timeouts > (RPCN_TIMEOUT / RPCN_TIMEOUT_INTERVAL))
{ {
rpcn_log.error("send_packet timeout with %d bytes sent", n_sent); rpcn_log.error("send_packet timeout with %d bytes sent", n_sent);
return error_and_disconnect("Failed to send all the bytes"); return error_and_disconnect("Failed to send all the bytes");
} }
} }
}
else else
{ {
rpcn_log.error("send_packet failed with error: %s", get_wolfssl_error(write_wssl, res)); rpcn_log.error("send_packet failed with error: %s", get_wolfssl_error(write_wssl, res));
@ -686,6 +736,8 @@ namespace rpcn
return false; return false;
} }
wolfSSL_set_using_nonblock(read_wssl, 1);
memset(&addr_rpcn, 0, sizeof(addr_rpcn)); memset(&addr_rpcn, 0, sizeof(addr_rpcn));
addr_rpcn.sin_port = std::bit_cast<u16, be_t<u16>>(port); // htons addr_rpcn.sin_port = std::bit_cast<u16, be_t<u16>>(port); // htons
@ -706,21 +758,6 @@ namespace rpcn
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
#ifdef _WIN32
u32 timeout = 200; // 200ms
#else
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 200000; // 200ms
#endif
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<char*>(&timeout), sizeof(timeout)) < 0)
{
rpcn_log.error("connect: Failed to setsockopt!");
state = rpcn_state::failure_other;
return false;
}
if (::connect(sockfd, reinterpret_cast<struct sockaddr*>(&addr_rpcn), sizeof(addr_rpcn)) != 0) if (::connect(sockfd, reinterpret_cast<struct sockaddr*>(&addr_rpcn), sizeof(addr_rpcn)) != 0)
{ {
rpcn_log.error("connect: Failed to connect to RPCN server!"); rpcn_log.error("connect: Failed to connect to RPCN server!");
@ -730,6 +767,13 @@ namespace rpcn
rpcn_log.notice("connect: Connection successful"); rpcn_log.notice("connect: Connection successful");
#ifdef _WIN32
u_long _true = 1;
ensure(::ioctlsocket(sockfd, FIONBIO, &_true) == 0);
#else
ensure(::fcntl(sockfd, F_SETFL, ::fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK) == 0);
#endif
sockaddr_in client_addr; sockaddr_in client_addr;
socklen_t client_addr_size = sizeof(client_addr); socklen_t client_addr_size = sizeof(client_addr);
if (getsockname(sockfd, reinterpret_cast<struct sockaddr*>(&client_addr), &client_addr_size) != 0) if (getsockname(sockfd, reinterpret_cast<struct sockaddr*>(&client_addr), &client_addr_size) != 0)

View File

@ -493,6 +493,9 @@ namespace rpcn
atomic_t<u32> addr_sig{}; atomic_t<u32> addr_sig{};
atomic_t<u16> port_sig{}; atomic_t<u16> port_sig{};
atomic_t<u32> local_addr_sig{}; atomic_t<u32> local_addr_sig{};
static constexpr int RPCN_TIMEOUT_INTERVAL = 50; // 50ms
static constexpr int RPCN_TIMEOUT = 10'000; // 10s
}; };
} // namespace rpcn } // namespace rpcn