New sockets function: lwip_poll

Signed-off-by: goldsimon <goldsimon@gmx.de>
This commit is contained in:
Kalle Olavi Niemitalo 2017-09-21 21:50:36 +02:00 committed by goldsimon
parent 333f1bf2bd
commit 1152fd02c0
4 changed files with 371 additions and 15 deletions

View File

@ -290,7 +290,7 @@ static struct lwip_select_cb *select_cb_list;
#if LWIP_SOCKET_SELECT
static void event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len);
#define DEFAULT_SOCKET_EVENTCB event_callback
static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent);
static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent, struct lwip_sock *sock);
#else
#define DEFAULT_SOCKET_EVENTCB NULL
#endif
@ -1948,6 +1948,10 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
API_SELECT_CB_VAR_REF(select_cb).readset = readset;
API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_SOCKET_POLL
API_SELECT_CB_VAR_REF(select_cb).poll_fds = NULL;
API_SELECT_CB_VAR_REF(select_cb).poll_nfds = 0;
#endif
API_SELECT_CB_VAR_REF(select_cb).sem_signalled = 0;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
@ -2092,6 +2096,309 @@ lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
return nready;
}
#if LWIP_SOCKET_POLL
/** Options for the lwip_pollscan function. */
enum lwip_pollscan_opts
{
/** Clear revents in each struct pollfd. */
LWIP_POLLSCAN_CLEAR = 1,
/** Increment select_waiting in each struct lwip_sock. */
LWIP_POLLSCAN_INC_WAIT = 2,
/** Decrement select_waiting in each struct lwip_sock. */
LWIP_POLLSCAN_DEC_WAIT = 4
};
/**
* Update revents in each struct pollfd.
* Optionally update select_waiting in struct lwip_sock.
*
* @param fds array of structures to update
* @param nfds number of structures in fds
* @param opts what to update and how
* @return number of structures that have revents != 0
*/
static int
lwip_pollscan(struct pollfd *fds, nfds_t nfds, enum lwip_pollscan_opts opts)
{
int nready = 0;
nfds_t fdi;
struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
/* Go through each struct pollfd in the array. */
for (fdi = 0; fdi < nfds; fdi++) {
if ((opts & LWIP_POLLSCAN_CLEAR) != 0) {
fds[fdi].revents = 0;
}
/* Negative fd means the caller wants us to ignore this struct.
POLLNVAL means we already detected that the fd is invalid;
if another thread has since opened a new socket with that fd,
we must not use that socket. */
if (fds[fdi].fd >= 0 && (fds[fdi].revents & POLLNVAL) == 0) {
/* First get the socket's status (protected)... */
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn(fds[fdi].fd);
if (sock != NULL) {
void* lastdata = sock->lastdata.pbuf;
s16_t rcvevent = sock->rcvevent;
u16_t sendevent = sock->sendevent;
u16_t errevent = sock->errevent;
if ((opts & LWIP_POLLSCAN_INC_WAIT) != 0) {
sock->select_waiting++;
if (sock->select_waiting == 0) {
/* overflow - too many threads waiting */
sock->select_waiting--;
done_socket(sock);
nready = -1;
SYS_ARCH_UNPROTECT(lev);
break;
}
done_socket(sock);
} else if ((opts & LWIP_POLLSCAN_DEC_WAIT) != 0) {
/* for now, handle select_waiting==0... */
LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
if (sock->select_waiting > 0) {
sock->select_waiting--;
}
done_socket(sock);
}
SYS_ARCH_UNPROTECT(lev);
/* ... then examine it: */
/* See if netconn of this socket is ready for read */
if ((fds[fdi].events & POLLIN) != 0 && ((lastdata != NULL) || (rcvevent > 0))) {
fds[fdi].revents |= POLLIN;
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for reading\n", fds[fdi].fd));
}
/* See if netconn of this socket is ready for write */
if ((fds[fdi].events & POLLOUT) != 0 && (sendevent != 0)) {
fds[fdi].revents |= POLLOUT;
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for writing\n", fds[fdi].fd));
}
/* See if netconn of this socket had an error */
if (errevent != 0) {
/* POLLERR is output only. */
fds[fdi].revents |= POLLERR;
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_pollscan: fd=%d ready for exception\n", fds[fdi].fd));
}
} else {
/* Not a valid socket */
SYS_ARCH_UNPROTECT(lev);
/* POLLNVAL is output only. */
fds[fdi].revents |= POLLNVAL;
return -1;
}
}
/* Will return the number of structures that have events,
not the number of events. */
if (fds[fdi].revents != 0) {
nready++;
}
}
LWIP_ASSERT("nready >= 0", nready >= 0);
return nready;
}
#if LWIP_NETCONN_FULLDUPLEX
/* Mark all sockets as used.
*
* All sockets are marked (and later unmarked), whether they are open or not.
* This is OK as lwip_pollscan aborts select when non-open sockets are found.
*/
static void
lwip_poll_inc_sockets_used(struct pollfd *fds, nfds_t nfds)
{
nfds_t fdi;
SYS_ARCH_DECL_PROTECT(lev);
if(fds) {
/* Go through each struct pollfd in the array. */
for (fdi = 0; fdi < nfds; fdi++) {
SYS_ARCH_PROTECT(lev);
/* Increase the reference counter */
tryget_socket_unconn(fds[fdi].fd);
SYS_ARCH_UNPROTECT(lev);
}
}
}
/* Let go all sockets that were marked as used when starting poll */
static void
lwip_poll_dec_sockets_used(struct pollfd *fds, nfds_t nfds)
{
nfds_t fdi;
struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
if(fds) {
/* Go through each struct pollfd in the array. */
for (fdi = 0; fdi < nfds; fdi++) {
sock = tryget_socket_unconn_nouse(fds[fdi].fd);
LWIP_ASSERT("socket gone at the end of select", sock != NULL);
if (sock != NULL) {
done_socket(sock);
}
}
}
}
#else /* LWIP_NETCONN_FULLDUPLEX */
#define lwip_poll_inc_sockets_used(fds, nfds)
#define lwip_poll_dec_sockets_used(fds, nfds)
#endif /* LWIP_NETCONN_FULLDUPLEX */
int
lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
u32_t waitres = 0;
int nready;
u32_t msectimeout;
struct lwip_select_cb select_cb;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
(void*)fds, (int)nfds, timeout));
lwip_poll_inc_sockets_used(fds, nfds);
/* Go through each struct pollfd to count number of structures
which currently match */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);
if (nready < 0) {
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
/* If we don't have any current events, then suspend if we are supposed to */
if (!nready) {
if (timeout == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
goto return_success;
}
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables. */
select_cb.next = NULL;
select_cb.prev = NULL;
select_cb.readset = NULL;
select_cb.writeset = NULL;
select_cb.exceptset = NULL;
select_cb.poll_fds = fds;
select_cb.poll_nfds = nfds;
select_cb.sem_signalled = 0;
#if LWIP_NETCONN_SEM_PER_THREAD
select_cb.sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&select_cb.sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(ENOMEM);
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&select_cb);
/* Increase select_waiting for each socket we are interested in.
Also, check for events again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout < 0) {
/* Wait forever */
msectimeout = 0;
} else {
/* timeout == 0 would have been handled earlier. */
LWIP_ASSERT("timeout > 0", timeout > 0);
msectimeout = timeout;
}
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(select_cb.sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
/* Decrease select_waiting for each socket we are interested in,
and check which events occurred while we waited. */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);
lwip_unlink_select_cb(&select_cb);
#if LWIP_NETCONN_SEM_PER_THREAD
if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(select_cb.sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&select_cb.sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
/* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
goto return_success;
}
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
lwip_poll_dec_sockets_used(fds, nfds);
set_errno(0);
return nready;
}
/**
* Check whether event_callback should wake up a thread waiting in
* lwip_poll.
*/
static int
lwip_poll_should_wake(const struct lwip_select_cb *scb, int fd, struct lwip_sock *sock)
{
nfds_t fdi;
for (fdi = 0; fdi < scb->poll_nfds; fdi++) {
const struct pollfd *pollfd = &scb->poll_fds[fdi];
if (pollfd->fd == fd) {
/* Do not update pollfd->revents right here;
that would be a data race because lwip_pollscan
accesses revents without protecting. */
if (sock->rcvevent > 0 && (pollfd->events & POLLIN) != 0) {
return 1;
}
if (sock->sendevent != 0 && (pollfd->events & POLLOUT) != 0) {
return 1;
}
if (sock->errevent != 0) {
/* POLLERR is output only. */
return 1;
}
}
}
return 0;
}
#endif /* LWIP_SOCKET_POLL */
/**
* Callback registered in the netconn layer for each socket-netconn.
* Processes recvevent (data available) and wakes up tasks waiting for select.
@ -2181,7 +2488,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
has_errevent = sock->errevent != 0;
SYS_ARCH_UNPROTECT(lev);
/* Check any select calls waiting on this socket */
select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
select_check_waiters(s, has_recvevent, has_sendevent, has_errevent, sock);
} else {
SYS_ARCH_UNPROTECT(lev);
}
@ -2201,7 +2508,7 @@ event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
* select_cb_list during our UNPROTECT/PROTECT. We use a generational counter to
* detect this change and restart the list walk. The list is expected to be small
*/
static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent)
static void select_check_waiters(int s, int has_recvevent, int has_sendevent, int has_errevent, struct lwip_sock *sock)
{
struct lwip_select_cb *scb;
#if !LWIP_TCPIP_CORE_LOCKING
@ -2219,20 +2526,28 @@ again:
if (scb->sem_signalled == 0) {
/* semaphore not signalled yet */
int do_signal = 0;
/* Test this select call for our socket */
if (has_recvevent) {
if (scb->readset && FD_ISSET(s, scb->readset)) {
do_signal = 1;
#if LWIP_SOCKET_POLL
if (scb->poll_fds != NULL) {
do_signal = lwip_poll_should_wake(scb, s, sock);
} else
#endif /* LWIP_SOCKET_POLL */
{
LWIP_UNUSED_ARG(sock);
/* Test this select call for our socket */
if (has_recvevent) {
if (scb->readset && FD_ISSET(s, scb->readset)) {
do_signal = 1;
}
}
}
if (has_sendevent) {
if (!do_signal && scb->writeset && FD_ISSET(s, scb->writeset)) {
do_signal = 1;
if (has_sendevent) {
if (!do_signal && scb->writeset && FD_ISSET(s, scb->writeset)) {
do_signal = 1;
}
}
}
if (has_errevent) {
if (!do_signal && scb->exceptset && FD_ISSET(s, scb->exceptset)) {
do_signal = 1;
if (has_errevent) {
if (!do_signal && scb->exceptset && FD_ISSET(s, scb->exceptset)) {
do_signal = 1;
}
}
}
if (do_signal) {

View File

@ -2016,6 +2016,14 @@
#if !defined LWIP_SOCKET_SELECT || defined __DOXYGEN__
#define LWIP_SOCKET_SELECT 1
#endif
/**
* LWIP_SOCKET_POLL==1 (default): enable poll() for sockets (including
* struct pollfd, nfds_t, and constants)
*/
#if !defined LWIP_SOCKET_POLL || defined __DOXYGEN__
#define LWIP_SOCKET_POLL 1
#endif
/**
* @}
*/

View File

@ -155,6 +155,12 @@ struct lwip_select_cb {
fd_set *writeset;
/** unimplemented: exceptset passed to select */
fd_set *exceptset;
#if LWIP_SOCKET_POLL
/** fds passed to poll; NULL if select */
struct pollfd *poll_fds;
/** nfds passed to poll; 0 if select */
nfds_t poll_nfds;
#endif
/** don't signal the same semaphore twice: set to 1 when signalled */
int sem_signalled;
/** semaphore to wake up a task waiting for select */

View File

@ -486,6 +486,23 @@ typedef struct fd_set
#error "external FD_SETSIZE too small for number of sockets"
#endif /* FD_SET */
/* poll-related defines and types */
/* @todo: find a better way to guard the definition of these defines and types if already defined */
#if !defined(POLLIN) && !defined(POLLOUT)
#define POLLIN 1
#define POLLOUT 2
#define POLLERR 4
#define POLLNVAL 8
/* No support for POLLPRI, POLLHUP, POLLMSG, POLLRDBAND, POLLWRBAND. */
typedef int nfds_t;
struct pollfd
{
int fd;
short events;
short revents;
};
#endif
/** LWIP_TIMEVAL_PRIVATE: if you want to use the struct timeval provided
* by your system, set this to 0 and include <sys/time.h> in cc.h */
#ifndef LWIP_TIMEVAL_PRIVATE
@ -525,6 +542,9 @@ void lwip_socket_thread_cleanup(void); /* LWIP_NETCONN_SEM_PER_THREAD==1: destro
#if LWIP_SOCKET_SELECT
#define lwip_select select
#endif
#if LWIP_SOCKET_POLL
#define lwip_poll poll
#endif
#define lwip_ioctlsocket ioctl
#define lwip_inet_ntop inet_ntop
#define lwip_inet_pton inet_pton
@ -569,6 +589,9 @@ ssize_t lwip_writev(int s, const struct iovec *iov, int iovcnt);
int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
struct timeval *timeout);
#endif
#if LWIP_SOCKET_POLL
int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout);
#endif
int lwip_ioctl(int s, long cmd, void *argp);
int lwip_fcntl(int s, int cmd, int val);
const char *lwip_inet_ntop(int af, const void *src, char *dst, socklen_t size);
@ -614,6 +637,10 @@ int lwip_inet_pton(int af, const char *src, void *dst);
/** @ingroup socket */
#define select(maxfdp1,readset,writeset,exceptset,timeout) lwip_select(maxfdp1,readset,writeset,exceptset,timeout)
#endif
#if LWIP_SOCKET_POLL
/** @ingroup socket */
#define poll(fds,nfds,timeout) lwip_poll(fds,nfds,timeout)
#endif
/** @ingroup socket */
#define ioctlsocket(s,cmd,argp) lwip_ioctl(s,cmd,argp)
/** @ingroup socket */