diff --git a/test/sockets/sockets_stresstest.c b/test/sockets/sockets_stresstest.c index db900420..396b2273 100644 --- a/test/sockets/sockets_stresstest.c +++ b/test/sockets/sockets_stresstest.c @@ -80,14 +80,20 @@ #define TEST_MODE_NONBLOCKING 0x04 #define TEST_MODE_WAIT 0x08 #define TEST_MODE_RECVTIMEO 0x10 +#define TEST_MODE_SLEEP 0x20 static int sockets_stresstest_numthreads; struct test_settings { struct sockaddr_storage addr; int start_client; + int loop_cnt; }; +struct sockets_stresstest_fullduplex { + int s; + volatile int closed; +}; static void fill_test_data(void *buf, size_t buf_len_bytes) @@ -156,6 +162,15 @@ recv_and_check_data_return_offset(int s, char *rxbuf, size_t rxbufsize, size_t r } *closed = 0; LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_TRACE, ("%s %d rx %d\n", dbg, s, (int)ret)); + if (ret == -1) { + /* TODO: for this to work, 'errno' has to support multithreading... */ + int err = errno; + if (err == ENOTCONN) { + *closed = 1; + return 0; + } + LWIP_ASSERT("err == 0", err == 0); + } LWIP_ASSERT("ret > 0", ret > 0); return check_test_data(rxbuf, rxoff + ret); } @@ -321,7 +336,7 @@ sockets_stresstest_wait_readable_nonblock(int s, int timeout_ms) return 0; } -static int sockets_stresstest_rand_mode(int allow_wait) +static int sockets_stresstest_rand_mode(int allow_wait, int allow_rx) { u32_t random_value = LWIP_RAND(); #if LWIP_SOCKET_SELECT @@ -334,6 +349,9 @@ static int sockets_stresstest_rand_mode(int allow_wait) return TEST_MODE_POLL; } #endif + if (!allow_rx) { + return TEST_MODE_SLEEP; + } #if LWIP_SO_RCVTIMEO if (random_value & TEST_MODE_RECVTIMEO) { return TEST_MODE_RECVTIMEO; @@ -368,6 +386,11 @@ sockets_stresstest_wait_readable(int mode, int s, int timeout_ms) return sockets_stresstest_wait_readable_wait_peek(s, timeout_ms); case TEST_MODE_NONBLOCKING: return sockets_stresstest_wait_readable_nonblock(s, timeout_ms); + case TEST_MODE_SLEEP: + { + sys_msleep(timeout_ms); + return 1; + } default: LWIP_ASSERT("invalid mode", 0); break; @@ -375,6 +398,35 @@ sockets_stresstest_wait_readable(int mode, int s, int timeout_ms) return 0; } +#if LWIP_NETCONN_FULLDUPLEX +static void +sockets_stresstest_conn_client_r(void *arg) +{ + struct sockets_stresstest_fullduplex *fd = (struct sockets_stresstest_fullduplex *)arg; + int s = fd->s; + size_t rxoff = 0; + char rxbuf[TEST_TXRX_BUFSIZE]; + + while (1) { + int closed; + if (fd->closed) { + break; + } + rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli"); + if (fd->closed) { + break; + } + if (closed) { + lwip_close(s); + break; + } + } + + SYS_ARCH_DEC(sockets_stresstest_numthreads, 1); + LWIP_ASSERT("", sockets_stresstest_numthreads >= 0); +} +#endif + static void sockets_stresstest_conn_client(void *arg) { @@ -385,6 +437,8 @@ sockets_stresstest_conn_client(void *arg) char rxbuf[TEST_TXRX_BUFSIZE]; size_t rxoff = 0; u32_t max_time = sys_now() + (TEST_TIME_SECONDS * 1000); + int do_rx = 1; + struct sockets_stresstest_fullduplex *data = NULL; memcpy(&addr, arg, sizeof(addr)); LWIP_ASSERT("", addr.ss_family == AF_INET); @@ -397,37 +451,59 @@ sockets_stresstest_conn_client(void *arg) /* connect to the server */ s = lwip_socket(addr.ss_family, SOCK_STREAM, 0); LWIP_ASSERT("s >= 0", s >= 0); + +#if LWIP_NETCONN_FULLDUPLEX + if (LWIP_RAND() % 1) { + sys_thread_t t; + data = (struct sockets_stresstest_fullduplex*)mem_malloc(sizeof(struct sockets_stresstest_fullduplex)); + LWIP_ASSERT("data != NULL", data != 0); + SYS_ARCH_INC(sockets_stresstest_numthreads, 1); + data->s = s; + data->closed = 0; + t = sys_thread_new("sockets_stresstest_conn_client_r", sockets_stresstest_conn_client_r, data, 0, 0); + LWIP_ASSERT("thread != NULL", t != 0); + do_rx = 0; + } +#endif + + /* @todo: nonblocking connect? */ ret = lwip_connect(s, (struct sockaddr *)&addr, sizeof(struct sockaddr_storage)); LWIP_ASSERT("ret == 0", ret == 0); while (sys_now() < max_time) { int closed; - int mode = sockets_stresstest_rand_mode(0); + int mode = sockets_stresstest_rand_mode(0, do_rx); int timeout_ms = LWIP_RAND() % TEST_MAX_RXWAIT_MS; ret = sockets_stresstest_wait_readable(mode, s, timeout_ms); if (ret) { - /* read some */ - LWIP_ASSERT("readable", ret == TEST_SOCK_READABLE); - rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli"); - LWIP_ASSERT("client got closed", !closed); + if (do_rx) { + /* read some */ + LWIP_ASSERT("readable", ret == TEST_SOCK_READABLE); + rxoff = recv_and_check_data_return_offset(s, rxbuf, sizeof(rxbuf), rxoff, &closed, "cli"); + LWIP_ASSERT("client got closed", !closed); + } } else { /* timeout, send some */ size_t send_len = (LWIP_RAND() % (sizeof(txbuf) - 4)) + 4; fill_test_data(txbuf, send_len); LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_TRACE, ("cli %d tx %d\n", s, (int)send_len)); ret = lwip_write(s, txbuf, send_len); + if (ret == -1) { + /* TODO: for this to work, 'errno' has to support multithreading... */ + int err = errno; + LWIP_ASSERT("err == 0", err == 0); + } LWIP_ASSERT("ret == send_len", ret == (int)send_len); } } + if (data) { + data->closed = 1; + } ret = lwip_close(s); LWIP_ASSERT("ret == 0", ret == 0); - { - SYS_ARCH_DECL_PROTECT(lev); - SYS_ARCH_PROTECT(lev); - LWIP_ASSERT("", sockets_stresstest_numthreads > 0); - sockets_stresstest_numthreads--; - SYS_ARCH_UNPROTECT(lev); - } + + SYS_ARCH_DEC(sockets_stresstest_numthreads, 1); + LWIP_ASSERT("", sockets_stresstest_numthreads >= 0); } static void @@ -442,7 +518,7 @@ sockets_stresstest_conn_server(void *arg) while (1) { int closed; - int mode = sockets_stresstest_rand_mode(1); + int mode = sockets_stresstest_rand_mode(1, 1); int timeout_ms = LWIP_RAND() % TEST_MAX_RXWAIT_MS; ret = sockets_stresstest_wait_readable(mode, s, timeout_ms); if (ret) { @@ -468,22 +544,22 @@ sockets_stresstest_conn_server(void *arg) if (err == ECONNRESET) { break; } + if (err == ENOTCONN) { + break; + } + LWIP_ASSERT("unknown error", 0); } LWIP_ASSERT("ret == send_len", ret == (int)send_len); } } ret = lwip_close(s); LWIP_ASSERT("ret == 0", ret == 0); - { - SYS_ARCH_DECL_PROTECT(lev); - SYS_ARCH_PROTECT(lev); - LWIP_ASSERT("", sockets_stresstest_numthreads > 0); - sockets_stresstest_numthreads--; - SYS_ARCH_UNPROTECT(lev); - } + + SYS_ARCH_DEC(sockets_stresstest_numthreads, 1); + LWIP_ASSERT("", sockets_stresstest_numthreads >= 0); } -static void +static int sockets_stresstest_start_clients(const struct sockaddr_storage *remote_addr) { /* limit the number of connections */ @@ -492,10 +568,11 @@ sockets_stresstest_start_clients(const struct sockaddr_storage *remote_addr) for (i = 0; i < max_connections; i++) { sys_thread_t t; - sockets_stresstest_numthreads++; + SYS_ARCH_INC(sockets_stresstest_numthreads, 1); t = sys_thread_new("sockets_stresstest_conn_client", sockets_stresstest_conn_client, (void*)remote_addr, 0, 0); LWIP_ASSERT("thread != NULL", t != 0); } + return max_connections; } static void @@ -506,12 +583,14 @@ sockets_stresstest_listener(void *arg) struct sockaddr_storage addr; socklen_t addr_len; struct test_settings *settings = (struct test_settings *)arg; + int num_clients, num_servers = 0; slisten = lwip_socket(AF_INET, SOCK_STREAM, 0); LWIP_ASSERT("slisten >= 0", slisten >= 0); memcpy(&addr, &settings->addr, sizeof(struct sockaddr_storage)); ret = lwip_bind(slisten, (struct sockaddr *)&addr, sizeof(addr)); + LWIP_ASSERT("ret == 0", ret == 0); ret = lwip_listen(slisten, 0); LWIP_ASSERT("ret == 0", ret == 0); @@ -520,9 +599,9 @@ sockets_stresstest_listener(void *arg) ret = lwip_getsockname(slisten, (struct sockaddr *)&addr, &addr_len); LWIP_ASSERT("ret == 0", ret == 0); - sockets_stresstest_start_clients(&addr); + num_clients = sockets_stresstest_start_clients(&addr); - while(1) { + while (num_servers < num_clients) { struct sockaddr_storage aclient; socklen_t aclient_len = sizeof(aclient); int sclient = lwip_accept(slisten, (struct sockaddr *)&aclient, &aclient_len); @@ -530,7 +609,8 @@ sockets_stresstest_listener(void *arg) /* using server threads */ { sys_thread_t t; - sockets_stresstest_numthreads++; + SYS_ARCH_INC(sockets_stresstest_numthreads, 1); + num_servers++; t = sys_thread_new("sockets_stresstest_conn_server", sockets_stresstest_conn_server, (void*)sclient, 0, 0); LWIP_ASSERT("thread != NULL", t != 0); } @@ -538,6 +618,39 @@ sockets_stresstest_listener(void *arg) /* using server select */ #endif } + LWIP_DEBUGF(TEST_SOCKETS_STRESS | LWIP_DBG_STATE, ("sockets_stresstest_listener: all %d connections established\n", num_clients)); + + /* accepted all clients */ + while (sockets_stresstest_numthreads > 0) { + sys_msleep(1); + } + + ret = lwip_close(slisten); + LWIP_ASSERT("ret == 0", ret == 0); + + LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener: done\n")); +} + +static void +sockets_stresstest_listener_loop(void *arg) +{ + int i; + struct test_settings *settings = (struct test_settings *)arg; + + if (settings->loop_cnt) { + for (i = 0; i < settings->loop_cnt; i++) { + LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: iteration %d\n", i)); + sockets_stresstest_listener(arg); + sys_msleep(2); + } + LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: done\n")); + } else { + for (i = 0; ; i++) { + LWIP_DEBUGF(TEST_SOCKETS_STRESS |LWIP_DBG_STATE, ("sockets_stresstest_listener_loop: iteration %d\n", i)); + sockets_stresstest_listener(arg); + sys_msleep(2); + } + } } void @@ -550,12 +663,12 @@ sockets_stresstest_init_loopback(int addr_family) memset(settings, 0, sizeof(struct test_settings)); #if LWIP_IPV4 && LWIP_IPV6 LWIP_ASSERT("invalid addr_family", (addr_family == AF_INET) || (addr_family == AF_INET6)); - settings->addr.ss_family = (sa_family_t)addr_family; #endif + settings->addr.ss_family = (sa_family_t)addr_family; LWIP_UNUSED_ARG(addr_family); settings->start_client = 1; - t = sys_thread_new("sockets_stresstest_listener", sockets_stresstest_listener, settings, 0, 0); + t = sys_thread_new("sockets_stresstest_listener_loop", sockets_stresstest_listener_loop, settings, 0, 0); LWIP_ASSERT("thread != NULL", t != 0); }