bug #28775 (select/event_callback: only check select_cb_list on change) plus use SYS_LIGHTWEIGHT_PROT to protect the select code. This should speed up receiving data on sockets as the select code in event_callback is only executed when select is waiting.

This commit is contained in:
goldsimon 2010-03-07 18:40:54 +00:00
parent 9e37d70163
commit d5531a239b
2 changed files with 204 additions and 168 deletions

View File

@ -13,6 +13,12 @@ HISTORY
++ New features: ++ New features:
2010-03-07: Simon Goldschmidt
* sockets.c: bug #28775 (select/event_callback: only check select_cb_list
on change) plus use SYS_LIGHTWEIGHT_PROT to protect the select code.
This should speed up receiving data on sockets as the select code in
event_callback is only executed when select is waiting.
2010-03-06: Simon Goldschmidt 2010-03-06: Simon Goldschmidt
* tcp_out.c: task #7013 (Create option to have all packets delivered to * tcp_out.c: task #7013 (Create option to have all packets delivered to
netif->output in one piece): Always copy to try to create single pbufs netif->output in one piece): Always copy to try to create single pbufs

View File

@ -75,12 +75,16 @@ struct lwip_sock {
u16_t errevent; u16_t errevent;
/** last error that occurred on this socket */ /** last error that occurred on this socket */
int err; int err;
/** counter of how many threads are waiting for this socket using select */
int select_waiting;
}; };
/** Description for a task waiting in select */ /** Description for a task waiting in select */
struct lwip_select_cb { struct lwip_select_cb {
/** Pointer to the next waiting task */ /** Pointer to the next waiting task */
struct lwip_select_cb *next; struct lwip_select_cb *next;
/** Pointer to the previous waiting task */
struct lwip_select_cb *prev;
/** readset passed to select */ /** readset passed to select */
fd_set *readset; fd_set *readset;
/** writeset passed to select */ /** writeset passed to select */
@ -117,9 +121,9 @@ struct lwip_setgetsockopt_data {
static struct lwip_sock sockets[NUM_SOCKETS]; static struct lwip_sock sockets[NUM_SOCKETS];
/** The global list of tasks waiting for select */ /** The global list of tasks waiting for select */
static struct lwip_select_cb *select_cb_list; static struct lwip_select_cb *select_cb_list;
/** This counter is increased from lwip_select when the list is chagned
/** Semaphore protecting select_cb_list */ and checked in event_callback to see if it has changed. */
static sys_mutex_t select_lock; static volatile int select_cb_ctr;
/** Table to quickly map an lwIP error (err_t) to a socket error /** Table to quickly map an lwIP error (err_t) to a socket error
* by using -err as an index */ * by using -err as an index */
@ -174,9 +178,6 @@ static void lwip_setsockopt_internal(void *arg);
void void
lwip_socket_init(void) lwip_socket_init(void)
{ {
if(sys_mutex_new(&select_lock) != ERR_OK) {
LWIP_ASSERT("failed to create select_lock", 0);
}
} }
/** /**
@ -207,6 +208,24 @@ get_socket(int s)
return sock; return sock;
} }
/**
* Same as get_socket but doesn't set errno
*
* @param s externally used socket index
* @return struct lwip_sock for the socket or NULL if not found
*/
static struct lwip_sock *
tryget_socket(int s)
{
if ((s < 0) || (s >= NUM_SOCKETS)) {
return NULL;
}
if (!sockets[s].conn) {
return NULL;
}
return &sockets[s];
}
/** /**
* Allocate a new socket for a given netconn. * Allocate a new socket for a given netconn.
* *
@ -238,6 +257,7 @@ alloc_socket(struct netconn *newconn, int accepted)
sockets[i].sendevent = (newconn->type == NETCONN_TCP ? (accepted != 0) : 1); sockets[i].sendevent = (newconn->type == NETCONN_TCP ? (accepted != 0) : 1);
sockets[i].errevent = 0; sockets[i].errevent = 0;
sockets[i].err = 0; sockets[i].err = 0;
sockets[i].select_waiting = 0;
return i; return i;
} }
SYS_ARCH_UNPROTECT(lev); SYS_ARCH_UNPROTECT(lev);
@ -266,6 +286,7 @@ free_socket(struct lwip_sock *sock, int is_tcp)
SYS_ARCH_PROTECT(lev); SYS_ARCH_PROTECT(lev);
sock->conn = NULL; sock->conn = NULL;
SYS_ARCH_UNPROTECT(lev); SYS_ARCH_UNPROTECT(lev);
/* don't use 'sock' after this line, as another task might have allocated it */
if (lastdata != NULL) { if (lastdata != NULL) {
if (is_tcp) { if (is_tcp) {
@ -929,221 +950,215 @@ lwip_write(int s, const void *data, size_t size)
* exceptset is not used for now!!! * exceptset is not used for now!!!
* *
* @param maxfdp1 the highest socket index in the sets * @param maxfdp1 the highest socket index in the sets
* @param readset in: set of sockets to check for read events; * @param readset_in: set of sockets to check for read events
* out: set of sockets that had read events * @param writeset_in: set of sockets to check for write events
* @param writeset in: set of sockets to check for write events; * @param exceptset_in: set of sockets to check for error events
* out: set of sockets that had write events * @param readset_out: set of sockets that had read events
* @param exceptset not yet implemented * @param writeset_out: set of sockets that had write events
* @return number of sockets that had events (read+write) * @param exceptset_out: set os sockets that had error events
* @return number of sockets that had events (read/write/exception) (>= 0)
*/ */
static int static int
lwip_selscan(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset) lwip_selscan(int maxfdp1, fd_set *readset_in, fd_set *writeset_in, fd_set *exceptset_in,
fd_set *readset_out, fd_set *writeset_out, fd_set *exceptset_out)
{ {
int i, nready = 0; int i, nready = 0;
fd_set lreadset, lwriteset, lexceptset; fd_set lreadset, lwriteset, lexceptset;
struct lwip_sock *p_sock; struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
FD_ZERO(&lreadset); FD_ZERO(&lreadset);
FD_ZERO(&lwriteset); FD_ZERO(&lwriteset);
FD_ZERO(&lexceptset); FD_ZERO(&lexceptset);
/* Go through each socket in each list to count number of sockets which /* Go through each socket in each list to count number of sockets which
currently match */ currently match */
for(i = 0; i < maxfdp1; i++) { for(i = 0; i < maxfdp1; i++) {
if (FD_ISSET(i, readset)) { void* lastdata = NULL;
/* See if netconn of this socket is ready for read */ s16_t rcvevent = 0;
p_sock = get_socket(i); u16_t sendevent = 0;
if (p_sock && (p_sock->lastdata || (p_sock->rcvevent > 0))) { u16_t errevent = 0;
FD_SET(i, &lreadset); /* First get the socket's status (protected)... */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for reading\n", i)); SYS_ARCH_PROTECT(lev);
nready++; sock = tryget_socket(i);
} if (sock != NULL) {
lastdata = sock->lastdata;
rcvevent = sock->rcvevent;
sendevent = sock->sendevent;
errevent = sock->errevent;
} }
if (FD_ISSET(i, writeset)) { SYS_ARCH_UNPROTECT(lev);
/* See if netconn of this socket is ready for write */ /* ... then examine it: */
p_sock = get_socket(i); /* See if netconn of this socket is ready for read */
if (p_sock && p_sock->sendevent) { if (readset_in && FD_ISSET(i, readset_in) && (lastdata != NULL) || (rcvevent > 0)) {
FD_SET(i, &lwriteset); FD_SET(i, &lreadset);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for writing\n", i)); LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for reading\n", i));
nready++; nready++;
}
} }
if (FD_ISSET(i, exceptset)) { /* See if netconn of this socket is ready for write */
/* See if netconn of this socket had an error */ if (writeset_in && FD_ISSET(i, writeset_in) && (sendevent != 0)) {
p_sock = get_socket(i); FD_SET(i, &lwriteset);
if (p_sock && p_sock->errevent) { LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for writing\n", i));
FD_SET(i, &lexceptset); nready++;
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for exception\n", i)); }
nready++; /* See if netconn of this socket had an error */
} if (exceptset_in && FD_ISSET(i, exceptset_in) && (errevent != 0)) {
FD_SET(i, &lexceptset);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_selscan: fd=%d ready for exception\n", i));
nready++;
} }
} }
*readset = lreadset; /* copy local sets to the ones provided as arguments */
*writeset = lwriteset; *readset_out = lreadset;
*exceptset = lexceptset; *writeset_out = lwriteset;
*exceptset_out = lexceptset;
LWIP_ASSERT("nready >= 0", nready >= 0);
return nready; return nready;
} }
/** /**
* Processing exceptset is not yet implemented. * Processing exceptset is not yet implemented.
*/ */
int int
lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
struct timeval *timeout) struct timeval *timeout)
{ {
u32_t waitres; u32_t waitres = 0;
int nready; int nready;
fd_set lreadset, lwriteset, lexceptset; fd_set lreadset, lwriteset, lexceptset;
u32_t msectimeout; u32_t msectimeout;
struct lwip_select_cb select_cb; struct lwip_select_cb select_cb;
struct lwip_select_cb *p_selcb;
err_t err; err_t err;
int i;
SYS_ARCH_DECL_PROTECT(lev);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%ld tvusec=%ld)\n", LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%ld tvusec=%ld)\n",
maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset, maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
timeout ? (long)timeout->tv_sec : (long)-1, timeout ? (long)timeout->tv_sec : (long)-1,
timeout ? (long)timeout->tv_usec : (long)-1)); timeout ? (long)timeout->tv_usec : (long)-1));
select_cb.next = 0;
select_cb.readset = readset;
select_cb.writeset = writeset;
select_cb.exceptset = exceptset;
select_cb.sem_signalled = 0;
/* Protect ourselves searching through the list */
sys_mutex_lock(&select_lock);
if (readset) {
lreadset = *readset;
} else {
FD_ZERO(&lreadset);
}
if (writeset) {
lwriteset = *writeset;
} else {
FD_ZERO(&lwriteset);
}
if (exceptset) {
lexceptset = *exceptset;
} else {
FD_ZERO(&lexceptset);
}
/* Go through each socket in each list to count number of sockets which /* Go through each socket in each list to count number of sockets which
currently match */ currently match */
nready = lwip_selscan(maxfdp1, &lreadset, &lwriteset, &lexceptset); nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
/* If we don't have any current events, then suspend if we are supposed to */ /* If we don't have any current events, then suspend if we are supposed to */
if (!nready) { if (!nready) {
if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) { if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
sys_mutex_unlock(&select_lock);
if (readset) {
FD_ZERO(readset);
}
if (writeset) {
FD_ZERO(writeset);
}
if (exceptset) {
FD_ZERO(exceptset);
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n")); LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
set_errno(0); /* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
return 0; goto return_copy_fdsets;
} }
/* add our semaphore to list */ /* None ready: add our semaphore to list:
/* We don't actually need any dynamic memory. Our entry on the We don't actually need any dynamic memory. Our entry on the
* list is only valid while we are in this function, so it's ok list is only valid while we are in this function, so it's ok
* to use local variables */ to use local variables. */
select_cb.next = NULL;
select_cb.prev = NULL;
select_cb.readset = readset;
select_cb.writeset = writeset;
select_cb.exceptset = exceptset;
select_cb.sem_signalled = 0;
err = sys_sem_new(&select_cb.sem, 0); err = sys_sem_new(&select_cb.sem, 0);
if (err != ERR_OK) { if (err != ERR_OK) {
/* failed to create semaphore */ /* failed to create semaphore */
sys_mutex_unlock(&select_lock);
set_errno(ENOMEM); set_errno(ENOMEM);
return -1; return -1;
} }
/* Note that we are still protected */
/* Protect the select_cb_list */
SYS_ARCH_PROTECT(lev);
/* Put this select_cb on top of list */ /* Put this select_cb on top of list */
select_cb.next = select_cb_list; select_cb.next = select_cb_list;
if (select_cb_list != NULL) {
select_cb_list->prev = &select_cb;
}
select_cb_list = &select_cb; select_cb_list = &select_cb;
/* Increasing this counter tells even_callback that the list has changed. */
select_cb_ctr++;
/* Now we can safely unprotect */ /* Now we can safely unprotect */
sys_mutex_unlock(&select_lock); SYS_ARCH_UNPROTECT(lev);
/* Now just wait to be woken */ /* Increase select_waiting for each socket we are interested in */
if (timeout == 0) { for(i = 0; i < maxfdp1; i++) {
/* Wait forever */ if ((readset && FD_ISSET(i, readset)) ||
msectimeout = 0; (writeset && FD_ISSET(i, writeset)) ||
} else { (exceptset && FD_ISSET(i, exceptset))) {
msectimeout = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500)/1000)); struct lwip_sock *sock = tryget_socket(i);
if(msectimeout == 0) { LWIP_ASSERT("sock != NULL", sock != NULL);
msectimeout = 1; SYS_ARCH_PROTECT(lev);
sock->select_waiting++;
LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
SYS_ARCH_UNPROTECT(lev);
} }
} }
waitres = sys_arch_sem_wait(&select_cb.sem, msectimeout); /* Call lwip_selscan again: there could have been events between
the last scan (whithout us on the list) and putting us on the list! */
/* Take us off the list */ nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
sys_mutex_lock(&select_lock); if (!nready) {
if (select_cb_list == &select_cb) { /* Still none ready, just wait to be woken */
select_cb_list = select_cb.next; if (timeout == 0) {
} else { /* Wait forever */
for (p_selcb = select_cb_list; p_selcb; p_selcb = p_selcb->next) { msectimeout = 0;
if (p_selcb->next == &select_cb) { } else {
p_selcb->next = select_cb.next; msectimeout = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500)/1000));
break; if (msectimeout == 0) {
/* Wait 1ms at least (0 means wait forever) */
msectimeout = 1;
} }
} }
}
sys_mutex_unlock(&select_lock); waitres = sys_arch_sem_wait(&select_cb.sem, msectimeout);
}
/* Increase select_waiting for each socket we are interested in */
for(i = 0; i < maxfdp1; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock = tryget_socket(i);
LWIP_ASSERT("sock != NULL", sock != NULL);
SYS_ARCH_PROTECT(lev);
sock->select_waiting--;
LWIP_ASSERT("sock->select_waiting >= 0", sock->select_waiting >= 0);
SYS_ARCH_UNPROTECT(lev);
}
}
/* Take us off the list */
SYS_ARCH_PROTECT(lev);
if (select_cb.next != NULL) {
select_cb.next->prev = select_cb.prev;
}
if (select_cb_list == &select_cb) {
LWIP_ASSERT("select_cb.prev == NULL", select_cb.prev == NULL);
select_cb_list = select_cb.next;
} else {
LWIP_ASSERT("select_cb.prev != NULL", select_cb.prev != NULL);
select_cb.prev->next = select_cb.next;
}
SYS_ARCH_UNPROTECT(lev);
sys_sem_free(&select_cb.sem); sys_sem_free(&select_cb.sem);
if (waitres == SYS_ARCH_TIMEOUT) { if (waitres == SYS_ARCH_TIMEOUT) {
/* Timeout */ /* Timeout */
if (readset) {
FD_ZERO(readset);
}
if (writeset) {
FD_ZERO(writeset);
}
if (exceptset) {
FD_ZERO(exceptset);
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n")); LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
set_errno(0); /* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
return 0; goto return_copy_fdsets;
}
if (readset) {
lreadset = *readset;
} else {
FD_ZERO(&lreadset);
}
if (writeset) {
lwriteset = *writeset;
} else {
FD_ZERO(&lwriteset);
}
if (exceptset) {
lexceptset = *exceptset;
} else {
FD_ZERO(&lexceptset);
} }
/* See what's set */ /* See what's set */
nready = lwip_selscan(maxfdp1, &lreadset, &lwriteset, &lexceptset); nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
} else {
sys_mutex_unlock(&select_lock);
} }
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
return_copy_fdsets:
set_errno(0);
if (readset) { if (readset) {
*readset = lreadset; *readset = lreadset;
} }
@ -1154,8 +1169,6 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
*exceptset = lexceptset; *exceptset = lexceptset;
} }
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
set_errno(0);
return nready; return nready;
} }
@ -1170,6 +1183,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
int s; int s;
struct lwip_sock *sock; struct lwip_sock *sock;
struct lwip_select_cb *scb; struct lwip_select_cb *scb;
SYS_ARCH_DECL_PROTECT(lev);
LWIP_UNUSED_ARG(len); LWIP_UNUSED_ARG(len);
@ -1182,7 +1196,6 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
* Just count down (or up) if that's the case and we * Just count down (or up) if that's the case and we
* will use the data later. Note that only receive events * will use the data later. Note that only receive events
* can happen before the new socket is set up. */ * can happen before the new socket is set up. */
SYS_ARCH_DECL_PROTECT(lev);
SYS_ARCH_PROTECT(lev); SYS_ARCH_PROTECT(lev);
if (conn->socket < 0) { if (conn->socket < 0) {
if (evt == NETCONN_EVT_RCVPLUS) { if (evt == NETCONN_EVT_RCVPLUS) {
@ -1203,7 +1216,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
return; return;
} }
sys_mutex_lock(&select_lock); SYS_ARCH_PROTECT(lev);
/* Set event as required */ /* Set event as required */
switch (evt) { switch (evt) {
case NETCONN_EVT_RCVPLUS: case NETCONN_EVT_RCVPLUS:
@ -1225,7 +1238,14 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
LWIP_ASSERT("unknown event", 0); LWIP_ASSERT("unknown event", 0);
break; break;
} }
sys_mutex_unlock(&select_lock);
if (sock->select_waiting == 0) {
/* noone is waiting for this socket, no need to check select_cb_list */
SYS_ARCH_UNPROTECT(lev);
return;
}
SYS_ARCH_UNPROTECT(lev);
/* Now decide if anyone is waiting for this socket */ /* Now decide if anyone is waiting for this socket */
/* NOTE: This code is written this way to protect the select link list /* NOTE: This code is written this way to protect the select link list
@ -1235,8 +1255,10 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
the list the number of waiting select calls + 1. This list is the list the number of waiting select calls + 1. This list is
expected to be small. */ expected to be small. */
while (1) { while (1) {
sys_mutex_lock(&select_lock); int last_select_cb_ctr;
SYS_ARCH_PROTECT(lev);
for (scb = select_cb_list; scb; scb = scb->next) { for (scb = select_cb_list; scb; scb = scb->next) {
/* @todo: unprotect with each loop and check for changes? */
if (scb->sem_signalled == 0) { if (scb->sem_signalled == 0) {
/* Test this select call for our socket */ /* Test this select call for our socket */
if (scb->readset && FD_ISSET(s, scb->readset)) { if (scb->readset && FD_ISSET(s, scb->readset)) {
@ -1245,7 +1267,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
} }
} }
if (scb->writeset && FD_ISSET(s, scb->writeset)) { if (scb->writeset && FD_ISSET(s, scb->writeset)) {
if (sock->sendevent) { if (sock->sendevent != 0) {
break; break;
} }
} }
@ -1255,13 +1277,21 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
} }
} }
} }
/* unlock interrupts with each step */
last_select_cb_ctr = select_cb_ctr;
SYS_ARCH_UNPROTECT(lev);
SYS_ARCH_PROTECT(lev);
if (last_select_cb_ctr != select_cb_ctr) {
/* someone has changed select_cb_list, restart at the beginning */
scb = select_cb_list;
}
} }
if (scb) { if (scb) {
scb->sem_signalled = 1; scb->sem_signalled = 1;
sys_sem_signal(&scb->sem); sys_sem_signal(&scb->sem);
sys_mutex_unlock(&select_lock); SYS_ARCH_UNPROTECT(lev);
} else { } else {
sys_mutex_unlock(&select_lock); SYS_ARCH_UNPROTECT(lev);
break; break;
} }
} }