diff --git a/CHANGELOG b/CHANGELOG index 7e218d9d..0a9dbcbc 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,6 +13,12 @@ HISTORY ++ New features: + 2010-03-07: Simon Goldschmidt + * sockets.c: bug #28775 (select/event_callback: only check select_cb_list + on change) plus use SYS_LIGHTWEIGHT_PROT to protect the select code. + This should speed up receiving data on sockets as the select code in + event_callback is only executed when select is waiting. + 2010-03-06: Simon Goldschmidt * tcp_out.c: task #7013 (Create option to have all packets delivered to netif->output in one piece): Always copy to try to create single pbufs diff --git a/src/api/sockets.c b/src/api/sockets.c index 3b551a4b..988437e8 100644 --- a/src/api/sockets.c +++ b/src/api/sockets.c @@ -75,12 +75,16 @@ struct lwip_sock { u16_t errevent; /** last error that occurred on this socket */ int err; + /** counter of how many threads are waiting for this socket using select */ + int select_waiting; }; /** Description for a task waiting in select */ struct lwip_select_cb { /** Pointer to the next waiting task */ struct lwip_select_cb *next; + /** Pointer to the previous waiting task */ + struct lwip_select_cb *prev; /** readset passed to select */ fd_set *readset; /** writeset passed to select */ @@ -117,9 +121,9 @@ struct lwip_setgetsockopt_data { static struct lwip_sock sockets[NUM_SOCKETS]; /** The global list of tasks waiting for select */ static struct lwip_select_cb *select_cb_list; - -/** Semaphore protecting select_cb_list */ -static sys_mutex_t select_lock; +/** This counter is increased from lwip_select when the list is chagned + and checked in event_callback to see if it has changed. */ +static volatile int select_cb_ctr; /** Table to quickly map an lwIP error (err_t) to a socket error * by using -err as an index */ @@ -174,9 +178,6 @@ static void lwip_setsockopt_internal(void *arg); void lwip_socket_init(void) { - if(sys_mutex_new(&select_lock) != ERR_OK) { - LWIP_ASSERT("failed to create select_lock", 0); - } } /** @@ -207,6 +208,24 @@ get_socket(int s) return sock; } +/** + * Same as get_socket but doesn't set errno + * + * @param s externally used socket index + * @return struct lwip_sock for the socket or NULL if not found + */ +static struct lwip_sock * +tryget_socket(int s) +{ + if ((s < 0) || (s >= NUM_SOCKETS)) { + return NULL; + } + if (!sockets[s].conn) { + return NULL; + } + return &sockets[s]; +} + /** * Allocate a new socket for a given netconn. * @@ -238,6 +257,7 @@ alloc_socket(struct netconn *newconn, int accepted) sockets[i].sendevent = (newconn->type == NETCONN_TCP ? (accepted != 0) : 1); sockets[i].errevent = 0; sockets[i].err = 0; + sockets[i].select_waiting = 0; return i; } SYS_ARCH_UNPROTECT(lev); @@ -266,6 +286,7 @@ free_socket(struct lwip_sock *sock, int is_tcp) SYS_ARCH_PROTECT(lev); sock->conn = NULL; SYS_ARCH_UNPROTECT(lev); + /* don't use 'sock' after this line, as another task might have allocated it */ if (lastdata != NULL) { if (is_tcp) { @@ -929,221 +950,215 @@ lwip_write(int s, const void *data, size_t size) * exceptset is not used for now!!! * * @param maxfdp1 the highest socket index in the sets - * @param readset in: set of sockets to check for read events; - * out: set of sockets that had read events - * @param writeset in: set of sockets to check for write events; - * out: set of sockets that had write events - * @param exceptset not yet implemented - * @return number of sockets that had events (read+write) + * @param readset_in: set of sockets to check for read events + * @param writeset_in: set of sockets to check for write events + * @param exceptset_in: set of sockets to check for error events + * @param readset_out: set of sockets that had read events + * @param writeset_out: set of sockets that had write events + * @param exceptset_out: set os sockets that had error events + * @return number of sockets that had events (read/write/exception) (>= 0) */ static int -lwip_selscan(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset) +lwip_selscan(int maxfdp1, fd_set *readset_in, fd_set *writeset_in, fd_set *exceptset_in, + fd_set *readset_out, fd_set *writeset_out, fd_set *exceptset_out) { int i, nready = 0; fd_set lreadset, lwriteset, lexceptset; - struct lwip_sock *p_sock; - + struct lwip_sock *sock; + SYS_ARCH_DECL_PROTECT(lev); + FD_ZERO(&lreadset); FD_ZERO(&lwriteset); FD_ZERO(&lexceptset); - + /* Go through each socket in each list to count number of sockets which - currently match */ + currently match */ for(i = 0; i < maxfdp1; i++) { - if (FD_ISSET(i, readset)) { - /* See if netconn of this socket is ready for read */ - p_sock = get_socket(i); - if (p_sock && (p_sock->lastdata || (p_sock->rcvevent > 0))) { - FD_SET(i, &lreadset); - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for reading\n", i)); - nready++; - } + void* lastdata = NULL; + s16_t rcvevent = 0; + u16_t sendevent = 0; + u16_t errevent = 0; + /* First get the socket's status (protected)... */ + SYS_ARCH_PROTECT(lev); + sock = tryget_socket(i); + if (sock != NULL) { + lastdata = sock->lastdata; + rcvevent = sock->rcvevent; + sendevent = sock->sendevent; + errevent = sock->errevent; } - if (FD_ISSET(i, writeset)) { - /* See if netconn of this socket is ready for write */ - p_sock = get_socket(i); - if (p_sock && p_sock->sendevent) { - FD_SET(i, &lwriteset); - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for writing\n", i)); - nready++; - } + SYS_ARCH_UNPROTECT(lev); + /* ... then examine it: */ + /* See if netconn of this socket is ready for read */ + if (readset_in && FD_ISSET(i, readset_in) && (lastdata != NULL) || (rcvevent > 0)) { + FD_SET(i, &lreadset); + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for reading\n", i)); + nready++; } - if (FD_ISSET(i, exceptset)) { - /* See if netconn of this socket had an error */ - p_sock = get_socket(i); - if (p_sock && p_sock->errevent) { - FD_SET(i, &lexceptset); - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for exception\n", i)); - nready++; - } + /* See if netconn of this socket is ready for write */ + if (writeset_in && FD_ISSET(i, writeset_in) && (sendevent != 0)) { + FD_SET(i, &lwriteset); + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for writing\n", i)); + nready++; + } + /* See if netconn of this socket had an error */ + if (exceptset_in && FD_ISSET(i, exceptset_in) && (errevent != 0)) { + FD_SET(i, &lexceptset); + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for exception\n", i)); + nready++; } } - *readset = lreadset; - *writeset = lwriteset; - *exceptset = lexceptset; - + /* copy local sets to the ones provided as arguments */ + *readset_out = lreadset; + *writeset_out = lwriteset; + *exceptset_out = lexceptset; + + LWIP_ASSERT("nready >= 0", nready >= 0); return nready; } - /** * Processing exceptset is not yet implemented. */ int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, - struct timeval *timeout) + struct timeval *timeout) { - u32_t waitres; + u32_t waitres = 0; int nready; fd_set lreadset, lwriteset, lexceptset; u32_t msectimeout; struct lwip_select_cb select_cb; - struct lwip_select_cb *p_selcb; err_t err; + int i; + SYS_ARCH_DECL_PROTECT(lev); LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%ld tvusec=%ld)\n", maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset, timeout ? (long)timeout->tv_sec : (long)-1, timeout ? (long)timeout->tv_usec : (long)-1)); - select_cb.next = 0; - select_cb.readset = readset; - select_cb.writeset = writeset; - select_cb.exceptset = exceptset; - select_cb.sem_signalled = 0; - - /* Protect ourselves searching through the list */ - sys_mutex_lock(&select_lock); - - if (readset) { - lreadset = *readset; - } else { - FD_ZERO(&lreadset); - } - if (writeset) { - lwriteset = *writeset; - } else { - FD_ZERO(&lwriteset); - } - if (exceptset) { - lexceptset = *exceptset; - } else { - FD_ZERO(&lexceptset); - } - /* Go through each socket in each list to count number of sockets which currently match */ - nready = lwip_selscan(maxfdp1, &lreadset, &lwriteset, &lexceptset); + nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset); /* If we don't have any current events, then suspend if we are supposed to */ if (!nready) { if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) { - sys_mutex_unlock(&select_lock); - if (readset) { - FD_ZERO(readset); - } - if (writeset) { - FD_ZERO(writeset); - } - if (exceptset) { - FD_ZERO(exceptset); - } - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n")); - set_errno(0); - - return 0; + /* This is OK as the local fdsets are empty and nready is zero, + or we would have returned earlier. */ + goto return_copy_fdsets; } - - /* add our semaphore to list */ - /* We don't actually need any dynamic memory. Our entry on the - * list is only valid while we are in this function, so it's ok - * to use local variables */ - + + /* None ready: add our semaphore to list: + We don't actually need any dynamic memory. Our entry on the + list is only valid while we are in this function, so it's ok + to use local variables. */ + + select_cb.next = NULL; + select_cb.prev = NULL; + select_cb.readset = readset; + select_cb.writeset = writeset; + select_cb.exceptset = exceptset; + select_cb.sem_signalled = 0; err = sys_sem_new(&select_cb.sem, 0); if (err != ERR_OK) { /* failed to create semaphore */ - sys_mutex_unlock(&select_lock); set_errno(ENOMEM); return -1; } - /* Note that we are still protected */ + + /* Protect the select_cb_list */ + SYS_ARCH_PROTECT(lev); + /* Put this select_cb on top of list */ select_cb.next = select_cb_list; + if (select_cb_list != NULL) { + select_cb_list->prev = &select_cb; + } select_cb_list = &select_cb; - + /* Increasing this counter tells even_callback that the list has changed. */ + select_cb_ctr++; + /* Now we can safely unprotect */ - sys_mutex_unlock(&select_lock); - - /* Now just wait to be woken */ - if (timeout == 0) { - /* Wait forever */ - msectimeout = 0; - } else { - msectimeout = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500)/1000)); - if(msectimeout == 0) { - msectimeout = 1; + SYS_ARCH_UNPROTECT(lev); + + /* Increase select_waiting for each socket we are interested in */ + for(i = 0; i < maxfdp1; i++) { + if ((readset && FD_ISSET(i, readset)) || + (writeset && FD_ISSET(i, writeset)) || + (exceptset && FD_ISSET(i, exceptset))) { + struct lwip_sock *sock = tryget_socket(i); + LWIP_ASSERT("sock != NULL", sock != NULL); + SYS_ARCH_PROTECT(lev); + sock->select_waiting++; + LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0); + SYS_ARCH_UNPROTECT(lev); } } - - waitres = sys_arch_sem_wait(&select_cb.sem, msectimeout); - - /* Take us off the list */ - sys_mutex_lock(&select_lock); - if (select_cb_list == &select_cb) { - select_cb_list = select_cb.next; - } else { - for (p_selcb = select_cb_list; p_selcb; p_selcb = p_selcb->next) { - if (p_selcb->next == &select_cb) { - p_selcb->next = select_cb.next; - break; + + /* Call lwip_selscan again: there could have been events between + the last scan (whithout us on the list) and putting us on the list! */ + nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset); + if (!nready) { + /* Still none ready, just wait to be woken */ + if (timeout == 0) { + /* Wait forever */ + msectimeout = 0; + } else { + msectimeout = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500)/1000)); + if (msectimeout == 0) { + /* Wait 1ms at least (0 means wait forever) */ + msectimeout = 1; } } + + waitres = sys_arch_sem_wait(&select_cb.sem, msectimeout); } - - sys_mutex_unlock(&select_lock); - + /* Increase select_waiting for each socket we are interested in */ + for(i = 0; i < maxfdp1; i++) { + if ((readset && FD_ISSET(i, readset)) || + (writeset && FD_ISSET(i, writeset)) || + (exceptset && FD_ISSET(i, exceptset))) { + struct lwip_sock *sock = tryget_socket(i); + LWIP_ASSERT("sock != NULL", sock != NULL); + SYS_ARCH_PROTECT(lev); + sock->select_waiting--; + LWIP_ASSERT("sock->select_waiting >= 0", sock->select_waiting >= 0); + SYS_ARCH_UNPROTECT(lev); + } + } + /* Take us off the list */ + SYS_ARCH_PROTECT(lev); + if (select_cb.next != NULL) { + select_cb.next->prev = select_cb.prev; + } + if (select_cb_list == &select_cb) { + LWIP_ASSERT("select_cb.prev == NULL", select_cb.prev == NULL); + select_cb_list = select_cb.next; + } else { + LWIP_ASSERT("select_cb.prev != NULL", select_cb.prev != NULL); + select_cb.prev->next = select_cb.next; + } + SYS_ARCH_UNPROTECT(lev); + sys_sem_free(&select_cb.sem); if (waitres == SYS_ARCH_TIMEOUT) { /* Timeout */ - if (readset) { - FD_ZERO(readset); - } - if (writeset) { - FD_ZERO(writeset); - } - if (exceptset) { - FD_ZERO(exceptset); - } - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n")); - set_errno(0); - - return 0; + /* This is OK as the local fdsets are empty and nready is zero, + or we would have returned earlier. */ + goto return_copy_fdsets; } - - if (readset) { - lreadset = *readset; - } else { - FD_ZERO(&lreadset); - } - if (writeset) { - lwriteset = *writeset; - } else { - FD_ZERO(&lwriteset); - } - if (exceptset) { - lexceptset = *exceptset; - } else { - FD_ZERO(&lexceptset); - } - + /* See what's set */ - nready = lwip_selscan(maxfdp1, &lreadset, &lwriteset, &lexceptset); - } else { - sys_mutex_unlock(&select_lock); + nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset); } - + + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready)); +return_copy_fdsets: + set_errno(0); if (readset) { *readset = lreadset; } @@ -1153,10 +1168,8 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, if (exceptset) { *exceptset = lexceptset; } - - LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready)); - set_errno(0); - + + return nready; } @@ -1170,6 +1183,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) int s; struct lwip_sock *sock; struct lwip_select_cb *scb; + SYS_ARCH_DECL_PROTECT(lev); LWIP_UNUSED_ARG(len); @@ -1182,7 +1196,6 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) * Just count down (or up) if that's the case and we * will use the data later. Note that only receive events * can happen before the new socket is set up. */ - SYS_ARCH_DECL_PROTECT(lev); SYS_ARCH_PROTECT(lev); if (conn->socket < 0) { if (evt == NETCONN_EVT_RCVPLUS) { @@ -1203,7 +1216,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) return; } - sys_mutex_lock(&select_lock); + SYS_ARCH_PROTECT(lev); /* Set event as required */ switch (evt) { case NETCONN_EVT_RCVPLUS: @@ -1225,7 +1238,14 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) LWIP_ASSERT("unknown event", 0); break; } - sys_mutex_unlock(&select_lock); + + if (sock->select_waiting == 0) { + /* noone is waiting for this socket, no need to check select_cb_list */ + SYS_ARCH_UNPROTECT(lev); + return; + } + + SYS_ARCH_UNPROTECT(lev); /* Now decide if anyone is waiting for this socket */ /* NOTE: This code is written this way to protect the select link list @@ -1235,8 +1255,10 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) the list the number of waiting select calls + 1. This list is expected to be small. */ while (1) { - sys_mutex_lock(&select_lock); + int last_select_cb_ctr; + SYS_ARCH_PROTECT(lev); for (scb = select_cb_list; scb; scb = scb->next) { + /* @todo: unprotect with each loop and check for changes? */ if (scb->sem_signalled == 0) { /* Test this select call for our socket */ if (scb->readset && FD_ISSET(s, scb->readset)) { @@ -1245,7 +1267,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) } } if (scb->writeset && FD_ISSET(s, scb->writeset)) { - if (sock->sendevent) { + if (sock->sendevent != 0) { break; } } @@ -1255,13 +1277,21 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) } } } + /* unlock interrupts with each step */ + last_select_cb_ctr = select_cb_ctr; + SYS_ARCH_UNPROTECT(lev); + SYS_ARCH_PROTECT(lev); + if (last_select_cb_ctr != select_cb_ctr) { + /* someone has changed select_cb_list, restart at the beginning */ + scb = select_cb_list; + } } if (scb) { scb->sem_signalled = 1; sys_sem_signal(&scb->sem); - sys_mutex_unlock(&select_lock); + SYS_ARCH_UNPROTECT(lev); } else { - sys_mutex_unlock(&select_lock); + SYS_ARCH_UNPROTECT(lev); break; } }