Improve the socket stress test to better test fullduplex

This commit is contained in:
goldsimon 2018-04-18 22:03:11 +02:00
parent 3abc8ae161
commit 71c13c6079

View File

@ -80,14 +80,20 @@
#define TEST_MODE_NONBLOCKING 0x04
#define TEST_MODE_WAIT 0x08
#define TEST_MODE_RECVTIMEO 0x10
#define TEST_MODE_SLEEP 0x20
static int sockets_stresstest_numthreads;
struct test_settings {
struct sockaddr_storage addr;
int start_client;
int loop_cnt;
};
struct sockets_stresstest_fullduplex {
int s;
volatile int closed;
};
static void
fill_test_data(void *buf, size_t buf_len_bytes)
@ -156,6 +162,15 @@ recv_and_check_data_return_offset(int s, char *rxbuf, size_t rxbufsize, size_t r
}
*closed = 0;
LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_TRACE, ("%s %d rx %d\n", dbg, s, (int)ret));
if (ret == -1) {
/* TODO: for this to work, 'errno' has to support multithreading... */
int err = errno;
if (err == ENOTCONN) {
*closed = 1;
return 0;
}
LWIP_ASSERT("err == 0", err == 0);
}
LWIP_ASSERT("ret > 0", ret > 0);
return check_test_data(rxbuf, rxoff + ret);
}
@ -321,7 +336,7 @@ sockets_stresstest_wait_readable_nonblock(int s, int timeout_ms)
return 0;
}
static int sockets_stresstest_rand_mode(int allow_wait)
static int sockets_stresstest_rand_mode(int allow_wait, int allow_rx)
{
u32_t random_value = LWIP_RAND();
#if LWIP_SOCKET_SELECT
@ -334,6 +349,9 @@ static int sockets_stresstest_rand_mode(int allow_wait)
return TEST_MODE_POLL;
}
#endif
if (!allow_rx) {
return TEST_MODE_SLEEP;
}
#if LWIP_SO_RCVTIMEO
if (random_value & TEST_MODE_RECVTIMEO) {
return TEST_MODE_RECVTIMEO;
@ -368,6 +386,11 @@ sockets_stresstest_wait_readable(int mode, int s, int timeout_ms)
return sockets_stresstest_wait_readable_wait_peek(s, timeout_ms);
case TEST_MODE_NONBLOCKING:
return sockets_stresstest_wait_readable_nonblock(s, timeout_ms);
case TEST_MODE_SLEEP:
{
sys_msleep(timeout_ms);
return 1;
}
default:
LWIP_ASSERT("invalid mode", 0);
break;
@ -375,6 +398,35 @@ sockets_stresstest_wait_readable(int mode, int s, int timeout_ms)
return 0;
}
#if LWIP_NETCONN_FULLDUPLEX
static void
sockets_stresstest_conn_client_r(void *arg)
{
struct sockets_stresstest_fullduplex *fd = (struct sockets_stresstest_fullduplex *)arg;
int s = fd->s;
size_t rxoff = 0;
char rxbuf[TEST_TXRX_BUFSIZE];
while (1) {
int closed;
if (fd->closed) {
break;
}
rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli");
if (fd->closed) {
break;
}
if (closed) {
lwip_close(s);
break;
}
}
SYS_ARCH_DEC(sockets_stresstest_numthreads, 1);
LWIP_ASSERT("", sockets_stresstest_numthreads >= 0);
}
#endif
static void
sockets_stresstest_conn_client(void *arg)
{
@ -385,6 +437,8 @@ sockets_stresstest_conn_client(void *arg)
char rxbuf[TEST_TXRX_BUFSIZE];
size_t rxoff = 0;
u32_t max_time = sys_now() + (TEST_TIME_SECONDS * 1000);
int do_rx = 1;
struct sockets_stresstest_fullduplex *data = NULL;
memcpy(&addr, arg, sizeof(addr));
LWIP_ASSERT("", addr.ss_family == AF_INET);
@ -397,37 +451,59 @@ sockets_stresstest_conn_client(void *arg)
/* connect to the server */
s = lwip_socket(addr.ss_family, SOCK_STREAM, 0);
LWIP_ASSERT("s >= 0", s >= 0);
#if LWIP_NETCONN_FULLDUPLEX
if (LWIP_RAND() % 1) {
sys_thread_t t;
data = (struct sockets_stresstest_fullduplex*)mem_malloc(sizeof(struct sockets_stresstest_fullduplex));
LWIP_ASSERT("data != NULL", data != 0);
SYS_ARCH_INC(sockets_stresstest_numthreads, 1);
data->s = s;
data->closed = 0;
t = sys_thread_new("sockets_stresstest_conn_client_r", sockets_stresstest_conn_client_r, data, 0, 0);
LWIP_ASSERT("thread != NULL", t != 0);
do_rx = 0;
}
#endif
/* @todo: nonblocking connect? */
ret = lwip_connect(s, (struct sockaddr *)&addr, sizeof(struct sockaddr_storage));
LWIP_ASSERT("ret == 0", ret == 0);
while (sys_now() < max_time) {
int closed;
int mode = sockets_stresstest_rand_mode(0);
int mode = sockets_stresstest_rand_mode(0, do_rx);
int timeout_ms = LWIP_RAND() % TEST_MAX_RXWAIT_MS;
ret = sockets_stresstest_wait_readable(mode, s, timeout_ms);
if (ret) {
/* read some */
LWIP_ASSERT("readable", ret == TEST_SOCK_READABLE);
rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli");
LWIP_ASSERT("client got closed", !closed);
if (do_rx) {
/* read some */
LWIP_ASSERT("readable", ret == TEST_SOCK_READABLE);
rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli");
LWIP_ASSERT("client got closed", !closed);
}
} else {
/* timeout, send some */
size_t send_len = (LWIP_RAND() % (sizeof(txbuf) - 4)) + 4;
fill_test_data(txbuf, send_len);
LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_TRACE, ("cli %d tx %d\n", s, (int)send_len));
ret = lwip_write(s, txbuf, send_len);
if (ret == -1) {
/* TODO: for this to work, 'errno' has to support multithreading... */
int err = errno;
LWIP_ASSERT("err == 0", err == 0);
}
LWIP_ASSERT("ret == send_len", ret == (int)send_len);
}
}
if (data) {
data->closed = 1;
}
ret = lwip_close(s);
LWIP_ASSERT("ret == 0", ret == 0);
{
SYS_ARCH_DECL_PROTECT(lev);
SYS_ARCH_PROTECT(lev);
LWIP_ASSERT("", sockets_stresstest_numthreads > 0);
sockets_stresstest_numthreads--;
SYS_ARCH_UNPROTECT(lev);
}
SYS_ARCH_DEC(sockets_stresstest_numthreads, 1);
LWIP_ASSERT("", sockets_stresstest_numthreads >= 0);
}
static void
@ -442,7 +518,7 @@ sockets_stresstest_conn_server(void *arg)
while (1) {
int closed;
int mode = sockets_stresstest_rand_mode(1);
int mode = sockets_stresstest_rand_mode(1, 1);
int timeout_ms = LWIP_RAND() % TEST_MAX_RXWAIT_MS;
ret = sockets_stresstest_wait_readable(mode, s, timeout_ms);
if (ret) {
@ -468,22 +544,22 @@ sockets_stresstest_conn_server(void *arg)
if (err == ECONNRESET) {
break;
}
if (err == ENOTCONN) {
break;
}
LWIP_ASSERT("unknown error", 0);
}
LWIP_ASSERT("ret == send_len", ret == (int)send_len);
}
}
ret = lwip_close(s);
LWIP_ASSERT("ret == 0", ret == 0);
{
SYS_ARCH_DECL_PROTECT(lev);
SYS_ARCH_PROTECT(lev);
LWIP_ASSERT("", sockets_stresstest_numthreads > 0);
sockets_stresstest_numthreads--;
SYS_ARCH_UNPROTECT(lev);
}
SYS_ARCH_DEC(sockets_stresstest_numthreads, 1);
LWIP_ASSERT("", sockets_stresstest_numthreads >= 0);
}
static void
static int
sockets_stresstest_start_clients(const struct sockaddr_storage *remote_addr)
{
/* limit the number of connections */
@ -492,10 +568,11 @@ sockets_stresstest_start_clients(const struct sockaddr_storage *remote_addr)
for (i = 0; i < max_connections; i++) {
sys_thread_t t;
sockets_stresstest_numthreads++;
SYS_ARCH_INC(sockets_stresstest_numthreads, 1);
t = sys_thread_new("sockets_stresstest_conn_client", sockets_stresstest_conn_client, (void*)remote_addr, 0, 0);
LWIP_ASSERT("thread != NULL", t != 0);
}
return max_connections;
}
static void
@ -506,12 +583,14 @@ sockets_stresstest_listener(void *arg)
struct sockaddr_storage addr;
socklen_t addr_len;
struct test_settings *settings = (struct test_settings *)arg;
int num_clients, num_servers = 0;
slisten = lwip_socket(AF_INET, SOCK_STREAM, 0);
LWIP_ASSERT("slisten >= 0", slisten >= 0);
memcpy(&addr, &settings->addr, sizeof(struct sockaddr_storage));
ret = lwip_bind(slisten, (struct sockaddr *)&addr, sizeof(addr));
LWIP_ASSERT("ret == 0", ret == 0);
ret = lwip_listen(slisten, 0);
LWIP_ASSERT("ret == 0", ret == 0);
@ -520,9 +599,9 @@ sockets_stresstest_listener(void *arg)
ret = lwip_getsockname(slisten, (struct sockaddr *)&addr, &addr_len);
LWIP_ASSERT("ret == 0", ret == 0);
sockets_stresstest_start_clients(&addr);
num_clients = sockets_stresstest_start_clients(&addr);
while(1) {
while (num_servers < num_clients) {
struct sockaddr_storage aclient;
socklen_t aclient_len = sizeof(aclient);
int sclient = lwip_accept(slisten, (struct sockaddr *)&aclient, &aclient_len);
@ -530,7 +609,8 @@ sockets_stresstest_listener(void *arg)
/* using server threads */
{
sys_thread_t t;
sockets_stresstest_numthreads++;
SYS_ARCH_INC(sockets_stresstest_numthreads, 1);
num_servers++;
t = sys_thread_new("sockets_stresstest_conn_server", sockets_stresstest_conn_server, (void*)sclient, 0, 0);
LWIP_ASSERT("thread != NULL", t != 0);
}
@ -538,6 +618,39 @@ sockets_stresstest_listener(void *arg)
/* using server select */
#endif
}
LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_STATE, ("sockets_stresstest_listener: all %d connections established\n", num_clients));
/* accepted all clients */
while (sockets_stresstest_numthreads > 0) {
sys_msleep(1);
}
ret = lwip_close(slisten);
LWIP_ASSERT("ret == 0", ret == 0);
LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener: done\n"));
}
static void
sockets_stresstest_listener_loop(void *arg)
{
int i;
struct test_settings *settings = (struct test_settings *)arg;
if (settings->loop_cnt) {
for (i = 0; i < settings->loop_cnt; i++) {
LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: iteration %d\n", i));
sockets_stresstest_listener(arg);
sys_msleep(2);
}
LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: done\n"));
} else {
for (i = 0; ; i++) {
LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: iteration %d\n", i));
sockets_stresstest_listener(arg);
sys_msleep(2);
}
}
}
void
@ -550,12 +663,12 @@ sockets_stresstest_init_loopback(int addr_family)
memset(settings, 0, sizeof(struct test_settings));
#if LWIP_IPV4 && LWIP_IPV6
LWIP_ASSERT("invalid addr_family", (addr_family == AF_INET) || (addr_family == AF_INET6));
settings->addr.ss_family = (sa_family_t)addr_family;
#endif
settings->addr.ss_family = (sa_family_t)addr_family;
LWIP_UNUSED_ARG(addr_family);
settings->start_client = 1;
t = sys_thread_new("sockets_stresstest_listener", sockets_stresstest_listener, settings, 0, 0);
t = sys_thread_new("sockets_stresstest_listener_loop", sockets_stresstest_listener_loop, settings, 0, 0);
LWIP_ASSERT("thread != NULL", t != 0);
}