diff --git a/src/api/sockets.c b/src/api/sockets.c index 9ebbd65c..8a7fdcaf 100644 --- a/src/api/sockets.c +++ b/src/api/sockets.c @@ -290,7 +290,7 @@ static struct lwip_select_cb *select_cb_list; #if LWIP_SOCKET_SELECT static void event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len); #define DEFAULT_SOCKET_EVENTCB event_callback -static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent); +static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent, struct lwip_sock *sock); #else #define DEFAULT_SOCKET_EVENTCB NULL #endif @@ -1948,6 +1948,10 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, API_SELECT_CB_VAR_REF(select_cb).readset = readset; API_SELECT_CB_VAR_REF(select_cb).writeset = writeset; API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset; +#if LWIP_SOCKET_POLL + API_SELECT_CB_VAR_REF(select_cb).poll_fds = NULL; + API_SELECT_CB_VAR_REF(select_cb).poll_nfds = 0; +#endif API_SELECT_CB_VAR_REF(select_cb).sem_signalled = 0; #if LWIP_NETCONN_SEM_PER_THREAD API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET(); @@ -2092,6 +2096,309 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, return nready; } +#if LWIP_SOCKET_POLL + +/** Options for the lwip_pollscan function. */ +enum lwip_pollscan_opts +{ + /** Clear revents in each struct pollfd. */ + LWIP_POLLSCAN_CLEAR = 1, + + /** Increment select_waiting in each struct lwip_sock. */ + LWIP_POLLSCAN_INC_WAIT = 2, + + /** Decrement select_waiting in each struct lwip_sock. */ + LWIP_POLLSCAN_DEC_WAIT = 4 +}; + +/** + * Update revents in each struct pollfd. + * Optionally update select_waiting in struct lwip_sock. + * + * @param fds array of structures to update + * @param nfds number of structures in fds + * @param opts what to update and how + * @return number of structures that have revents != 0 + */ +static int +lwip_pollscan(struct pollfd *fds, nfds_t nfds, enum lwip_pollscan_opts opts) +{ + int nready = 0; + nfds_t fdi; + struct lwip_sock *sock; + SYS_ARCH_DECL_PROTECT(lev); + + /* Go through each struct pollfd in the array. */ + for (fdi = 0; fdi < nfds; fdi++) { + if ((opts & LWIP_POLLSCAN_CLEAR) != 0) { + fds[fdi].revents = 0; + } + + /* Negative fd means the caller wants us to ignore this struct. + POLLNVAL means we already detected that the fd is invalid; + if another thread has since opened a new socket with that fd, + we must not use that socket. */ + if (fds[fdi].fd >= 0 && (fds[fdi].revents & POLLNVAL) == 0) { + /* First get the socket's status (protected)... */ + SYS_ARCH_PROTECT(lev); + sock = tryget_socket_unconn(fds[fdi].fd); + if (sock != NULL) { + void* lastdata = sock->lastdata.pbuf; + s16_t rcvevent = sock->rcvevent; + u16_t sendevent = sock->sendevent; + u16_t errevent = sock->errevent; + + if ((opts & LWIP_POLLSCAN_INC_WAIT) != 0) { + sock->select_waiting++; + if (sock->select_waiting == 0) { + /* overflow - too many threads waiting */ + sock->select_waiting--; + done_socket(sock); + nready = -1; + SYS_ARCH_UNPROTECT(lev); + break; + } + done_socket(sock); + } else if ((opts & LWIP_POLLSCAN_DEC_WAIT) != 0) { + /* for now, handle select_waiting==0... */ + LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0); + if (sock->select_waiting > 0) { + sock->select_waiting--; + } + done_socket(sock); + } + + SYS_ARCH_UNPROTECT(lev); + + /* ... then examine it: */ + /* See if netconn of this socket is ready for read */ + if ((fds[fdi].events & POLLIN) != 0 && ((lastdata != NULL) || (rcvevent > 0))) { + fds[fdi].revents |= POLLIN; + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for reading\n", fds[fdi].fd)); + } + /* See if netconn of this socket is ready for write */ + if ((fds[fdi].events & POLLOUT) != 0 && (sendevent != 0)) { + fds[fdi].revents |= POLLOUT; + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for writing\n", fds[fdi].fd)); + } + /* See if netconn of this socket had an error */ + if (errevent != 0) { + /* POLLERR is output only. */ + fds[fdi].revents |= POLLERR; + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for exception\n", fds[fdi].fd)); + } + } else { + /* Not a valid socket */ + SYS_ARCH_UNPROTECT(lev); + /* POLLNVAL is output only. */ + fds[fdi].revents |= POLLNVAL; + return -1; + } + } + + /* Will return the number of structures that have events, + not the number of events. */ + if (fds[fdi].revents != 0) { + nready++; + } + } + + LWIP_ASSERT("nready >= 0", nready >= 0); + return nready; +} + +#if LWIP_NETCONN_FULLDUPLEX +/* Mark all sockets as used. + * + * All sockets are marked (and later unmarked), whether they are open or not. + * This is OK as lwip_pollscan aborts select when non-open sockets are found. + */ +static void +lwip_poll_inc_sockets_used(struct pollfd *fds, nfds_t nfds) +{ + nfds_t fdi; + SYS_ARCH_DECL_PROTECT(lev); + + if(fds) { + /* Go through each struct pollfd in the array. */ + for (fdi = 0; fdi < nfds; fdi++) { + SYS_ARCH_PROTECT(lev); + /* Increase the reference counter */ + tryget_socket_unconn(fds[fdi].fd); + SYS_ARCH_UNPROTECT(lev); + } + } +} + +/* Let go all sockets that were marked as used when starting poll */ +static void +lwip_poll_dec_sockets_used(struct pollfd *fds, nfds_t nfds) +{ + nfds_t fdi; + struct lwip_sock *sock; + SYS_ARCH_DECL_PROTECT(lev); + + if(fds) { + /* Go through each struct pollfd in the array. */ + for (fdi = 0; fdi < nfds; fdi++) { + sock = tryget_socket_unconn_nouse(fds[fdi].fd); + LWIP_ASSERT("socket gone at the end of select", sock != NULL); + if (sock != NULL) { + done_socket(sock); + } + } + } +} +#else /* LWIP_NETCONN_FULLDUPLEX */ +#define lwip_poll_inc_sockets_used(fds, nfds) +#define lwip_poll_dec_sockets_used(fds, nfds) +#endif /* LWIP_NETCONN_FULLDUPLEX */ + +int +lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout) +{ + u32_t waitres = 0; + int nready; + u32_t msectimeout; + struct lwip_select_cb select_cb; +#if LWIP_NETCONN_SEM_PER_THREAD + int waited = 0; +#endif + + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n", + (void*)fds, (int)nfds, timeout)); + + lwip_poll_inc_sockets_used(fds, nfds); + + /* Go through each struct pollfd to count number of structures + which currently match */ + nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR); + + if (nready < 0) { + lwip_poll_dec_sockets_used(fds, nfds); + return -1; + } + + /* If we don't have any current events, then suspend if we are supposed to */ + if (!nready) { + if (timeout == 0) { + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n")); + goto return_success; + } + + /* None ready: add our semaphore to list: + We don't actually need any dynamic memory. Our entry on the + list is only valid while we are in this function, so it's ok + to use local variables. */ + + select_cb.next = NULL; + select_cb.prev = NULL; + select_cb.readset = NULL; + select_cb.writeset = NULL; + select_cb.exceptset = NULL; + select_cb.poll_fds = fds; + select_cb.poll_nfds = nfds; + select_cb.sem_signalled = 0; +#if LWIP_NETCONN_SEM_PER_THREAD + select_cb.sem = LWIP_NETCONN_THREAD_SEM_GET(); +#else /* LWIP_NETCONN_SEM_PER_THREAD */ + if (sys_sem_new(&select_cb.sem, 0) != ERR_OK) { + /* failed to create semaphore */ + set_errno(ENOMEM); + lwip_poll_dec_sockets_used(fds, nfds); + return -1; + } +#endif /* LWIP_NETCONN_SEM_PER_THREAD */ + + lwip_link_select_cb(&select_cb); + + /* Increase select_waiting for each socket we are interested in. + Also, check for events again: there could have been events between + the last scan (without us on the list) and putting us on the list! */ + nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT); + + if (!nready) { + /* Still none ready, just wait to be woken */ + if (timeout < 0) { + /* Wait forever */ + msectimeout = 0; + } else { + /* timeout == 0 would have been handled earlier. */ + LWIP_ASSERT("timeout > 0", timeout > 0); + msectimeout = timeout; + } + waitres = sys_arch_sem_wait(SELECT_SEM_PTR(select_cb.sem), msectimeout); +#if LWIP_NETCONN_SEM_PER_THREAD + waited = 1; +#endif + } + + /* Decrease select_waiting for each socket we are interested in, + and check which events occurred while we waited. */ + nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT); + + lwip_unlink_select_cb(&select_cb); + +#if LWIP_NETCONN_SEM_PER_THREAD + if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) { + /* don't leave the thread-local semaphore signalled */ + sys_arch_sem_wait(select_cb.sem, 1); + } +#else /* LWIP_NETCONN_SEM_PER_THREAD */ + sys_sem_free(&select_cb.sem); +#endif /* LWIP_NETCONN_SEM_PER_THREAD */ + + if (nready < 0) { + /* This happens when a socket got closed while waiting */ + lwip_poll_dec_sockets_used(fds, nfds); + return -1; + } + + if (waitres == SYS_ARCH_TIMEOUT) { + /* Timeout */ + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n")); + goto return_success; + } + } + + LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready)); +return_success: + lwip_poll_dec_sockets_used(fds, nfds); + set_errno(0); + return nready; +} + +/** + * Check whether event_callback should wake up a thread waiting in + * lwip_poll. + */ +static int +lwip_poll_should_wake(const struct lwip_select_cb *scb, int fd, struct lwip_sock *sock) +{ + nfds_t fdi; + for (fdi = 0; fdi < scb->poll_nfds; fdi++) { + const struct pollfd *pollfd = &scb->poll_fds[fdi]; + if (pollfd->fd == fd) { + /* Do not update pollfd->revents right here; + that would be a data race because lwip_pollscan + accesses revents without protecting. */ + if (sock->rcvevent > 0 && (pollfd->events & POLLIN) != 0) { + return 1; + } + if (sock->sendevent != 0 && (pollfd->events & POLLOUT) != 0) { + return 1; + } + if (sock->errevent != 0) { + /* POLLERR is output only. */ + return 1; + } + } + } + return 0; +} + +#endif /* LWIP_SOCKET_POLL */ + /** * Callback registered in the netconn layer for each socket-netconn. * Processes recvevent (data available) and wakes up tasks waiting for select. @@ -2181,7 +2488,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) has_errevent = sock->errevent != 0; SYS_ARCH_UNPROTECT(lev); /* Check any select calls waiting on this socket */ - select_check_waiters(s, has_recvevent, has_sendevent, has_errevent); + select_check_waiters(s, has_recvevent, has_sendevent, has_errevent, sock); } else { SYS_ARCH_UNPROTECT(lev); } @@ -2201,7 +2508,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len) * select_cb_list during our UNPROTECT/PROTECT. We use a generational counter to * detect this change and restart the list walk. The list is expected to be small */ -static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent) +static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent, struct lwip_sock *sock) { struct lwip_select_cb *scb; #if !LWIP_TCPIP_CORE_LOCKING @@ -2219,20 +2526,28 @@ again: if (scb->sem_signalled == 0) { /* semaphore not signalled yet */ int do_signal = 0; - /* Test this select call for our socket */ - if (has_recvevent) { - if (scb->readset && FD_ISSET(s, scb->readset)) { - do_signal = 1; +#if LWIP_SOCKET_POLL + if (scb->poll_fds != NULL) { + do_signal = lwip_poll_should_wake(scb, s, sock); + } else +#endif /* LWIP_SOCKET_POLL */ + { + LWIP_UNUSED_ARG(sock); + /* Test this select call for our socket */ + if (has_recvevent) { + if (scb->readset && FD_ISSET(s, scb->readset)) { + do_signal = 1; + } } - } - if (has_sendevent) { - if (!do_signal && scb->writeset && FD_ISSET(s, scb->writeset)) { - do_signal = 1; + if (has_sendevent) { + if (!do_signal && scb->writeset && FD_ISSET(s, scb->writeset)) { + do_signal = 1; + } } - } - if (has_errevent) { - if (!do_signal && scb->exceptset && FD_ISSET(s, scb->exceptset)) { - do_signal = 1; + if (has_errevent) { + if (!do_signal && scb->exceptset && FD_ISSET(s, scb->exceptset)) { + do_signal = 1; + } } } if (do_signal) { diff --git a/src/include/lwip/opt.h b/src/include/lwip/opt.h index 8d954764..420985ca 100644 --- a/src/include/lwip/opt.h +++ b/src/include/lwip/opt.h @@ -2016,6 +2016,14 @@ #if !defined LWIP_SOCKET_SELECT || defined __DOXYGEN__ #define LWIP_SOCKET_SELECT 1 #endif + +/** + * LWIP_SOCKET_POLL==1 (default): enable poll() for sockets (including + * struct pollfd, nfds_t, and constants) + */ +#if !defined LWIP_SOCKET_POLL || defined __DOXYGEN__ +#define LWIP_SOCKET_POLL 1 +#endif /** * @} */ diff --git a/src/include/lwip/priv/sockets_priv.h b/src/include/lwip/priv/sockets_priv.h index 3628a070..22075e15 100644 --- a/src/include/lwip/priv/sockets_priv.h +++ b/src/include/lwip/priv/sockets_priv.h @@ -155,6 +155,12 @@ struct lwip_select_cb { fd_set *writeset; /** unimplemented: exceptset passed to select */ fd_set *exceptset; +#if LWIP_SOCKET_POLL + /** fds passed to poll; NULL if select */ + struct pollfd *poll_fds; + /** nfds passed to poll; 0 if select */ + nfds_t poll_nfds; +#endif /** don't signal the same semaphore twice: set to 1 when signalled */ int sem_signalled; /** semaphore to wake up a task waiting for select */ diff --git a/src/include/lwip/sockets.h b/src/include/lwip/sockets.h index 5387b4f4..71c36b63 100644 --- a/src/include/lwip/sockets.h +++ b/src/include/lwip/sockets.h @@ -486,6 +486,23 @@ typedef struct fd_set #error "external FD_SETSIZE too small for number of sockets" #endif /* FD_SET */ +/* poll-related defines and types */ +/* @todo: find a better way to guard the definition of these defines and types if already defined */ +#if !defined(POLLIN) && !defined(POLLOUT) +#define POLLIN 1 +#define POLLOUT 2 +#define POLLERR 4 +#define POLLNVAL 8 +/* No support for POLLPRI, POLLHUP, POLLMSG, POLLRDBAND, POLLWRBAND. */ +typedef int nfds_t; +struct pollfd +{ + int fd; + short events; + short revents; +}; +#endif + /** LWIP_TIMEVAL_PRIVATE: if you want to use the struct timeval provided * by your system, set this to 0 and include in cc.h */ #ifndef LWIP_TIMEVAL_PRIVATE @@ -525,6 +542,9 @@ void lwip_socket_thread_cleanup(void); /* LWIP_NETCONN_SEM_PER_THREAD==1: destro #if LWIP_SOCKET_SELECT #define lwip_select select #endif +#if LWIP_SOCKET_POLL +#define lwip_poll poll +#endif #define lwip_ioctlsocket ioctl #define lwip_inet_ntop inet_ntop #define lwip_inet_pton inet_pton @@ -569,6 +589,9 @@ ssize_t lwip_writev(int s, const struct iovec *iov, int iovcnt); int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout); #endif +#if LWIP_SOCKET_POLL +int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout); +#endif int lwip_ioctl(int s, long cmd, void *argp); int lwip_fcntl(int s, int cmd, int val); const char *lwip_inet_ntop(int af, const void *src, char *dst, socklen_t size); @@ -614,6 +637,10 @@ int lwip_inet_pton(int af, const char *src, void *dst); /** @ingroup socket */ #define select(maxfdp1,readset,writeset,exceptset,timeout) lwip_select(maxfdp1,readset,writeset,exceptset,timeout) #endif +#if LWIP_SOCKET_POLL +/** @ingroup socket */ +#define poll(fds,nfds,timeout) lwip_poll(fds,nfds,timeout) +#endif /** @ingroup socket */ #define ioctlsocket(s,cmd,argp) lwip_ioctl(s,cmd,argp) /** @ingroup socket */