Vectorize netconn_write for TCP

This commit adds support to the netconn write APIs to take an input of
vectors instead of a single data pointer

This allows vectors sent on a TCP connection via sendmsg to be treated
atomically.  The set of vectors is segmented into as much data as can
fit into the send buffer and then the TCP output function is called

Previously, each vector was passed to netconn_write_partly and tcp_write
segmented it into its own packet, which was then it was sent via
tcp_output (if not Nagleing)

This commit adds vector support to lwip_netconn_do_writemore() which
is the meat of the TCP write functionality from netconn/sockets layer.
A new netconn API netconn_write_vectors_partly() takes a set of vectors
as input and hooks up to do_writemore()

This commit also defines IOV_MAX because we are limited to only
supporting 65535 vectors due to choice of u16_t for the vector count
This commit is contained in:
Joel Cunningham 2017-02-27 11:09:33 -06:00
parent 3feb748fee
commit 2980f7cc58
6 changed files with 118 additions and 56 deletions

View File

@ -826,13 +826,42 @@ netconn_send(struct netconn *conn, struct netbuf *buf)
err_t
netconn_write_partly(struct netconn *conn, const void *dataptr, size_t size,
u8_t apiflags, size_t *bytes_written)
{
struct netvector vector;
vector.ptr = dataptr;
vector.len = size;
return netconn_write_vectors_partly(conn, &vector, 1, apiflags, bytes_written);
}
/**
* Send vectorized data atomically over a TCP netconn.
*
* @param conn the TCP netconn over which to send data
* @param vectors array of vectors containing data to send
* @param vectorcnt number of vectors in the array
* @param apiflags combination of following flags :
* - NETCONN_COPY: data will be copied into memory belonging to the stack
* - NETCONN_MORE: for TCP connection, PSH flag will be set on last segment sent
* - NETCONN_DONTBLOCK: only write the data if all data can be written at once
* @param bytes_written pointer to a location that receives the number of written bytes
* @return ERR_OK if data was sent, any other err_t on error
*/
err_t
netconn_write_vectors_partly(struct netconn *conn, struct netvector *vectors, u16_t vectorcnt,
u8_t apiflags, size_t *bytes_written)
{
API_MSG_VAR_DECLARE(msg);
err_t err;
u8_t dontblock;
size_t size;
int i;
LWIP_ERROR("netconn_write: invalid conn", (conn != NULL), return ERR_ARG;);
LWIP_ERROR("netconn_write: invalid conn->type", (NETCONNTYPE_GROUP(conn->type)== NETCONN_TCP), return ERR_VAL;);
size = 0;
for (i = 0; i < vectorcnt; i++) {
size += vectors[i].len;
}
if (size == 0) {
return ERR_OK;
}
@ -851,7 +880,9 @@ netconn_write_partly(struct netconn *conn, const void *dataptr, size_t size,
API_MSG_VAR_ALLOC(msg);
/* non-blocking write sends as much */
API_MSG_VAR_REF(msg).conn = conn;
API_MSG_VAR_REF(msg).msg.w.dataptr = dataptr;
API_MSG_VAR_REF(msg).msg.w.vector = vectors;
API_MSG_VAR_REF(msg).msg.w.vector_cnt = vectorcnt;
API_MSG_VAR_REF(msg).msg.w.vector_off = 0;
API_MSG_VAR_REF(msg).msg.w.apiflags = apiflags;
API_MSG_VAR_REF(msg).msg.w.len = size;
API_MSG_VAR_REF(msg).msg.w.offset = 0;

View File

@ -1494,6 +1494,7 @@ lwip_netconn_do_writemore(struct netconn *conn WRITE_DELAYED_PARAM)
size_t diff;
u8_t dontblock;
u8_t apiflags;
u8_t write_more;
LWIP_ASSERT("conn != NULL", conn != NULL);
LWIP_ASSERT("conn->state == NETCONN_WRITE", (conn->state == NETCONN_WRITE));
@ -1501,6 +1502,7 @@ lwip_netconn_do_writemore(struct netconn *conn WRITE_DELAYED_PARAM)
LWIP_ASSERT("conn->pcb.tcp != NULL", conn->pcb.tcp != NULL);
LWIP_ASSERT("conn->current_msg->msg.w.offset < conn->current_msg->msg.w.len",
conn->current_msg->msg.w.offset < conn->current_msg->msg.w.len);
LWIP_ASSERT("conn->current_msg->msg.w.vector_cnt > 0", conn->current_msg->msg.w.vector_cnt > 0);
apiflags = conn->current_msg->msg.w.apiflags;
dontblock = netconn_is_nonblocking(conn) || (apiflags & NETCONN_DONTBLOCK);
@ -1519,49 +1521,78 @@ lwip_netconn_do_writemore(struct netconn *conn WRITE_DELAYED_PARAM)
} else
#endif /* LWIP_SO_SNDTIMEO */
{
dataptr = (const u8_t*)conn->current_msg->msg.w.dataptr + conn->current_msg->msg.w.offset;
diff = conn->current_msg->msg.w.len - conn->current_msg->msg.w.offset;
if (diff > 0xffffUL) { /* max_u16_t */
len = 0xffff;
apiflags |= TCP_WRITE_FLAG_MORE;
} else {
len = (u16_t)diff;
}
available = tcp_sndbuf(conn->pcb.tcp);
if (available < len) {
/* don't try to write more than sendbuf */
len = available;
if (dontblock) {
if (!len) {
err = ERR_WOULDBLOCK;
goto err_mem;
}
} else {
do {
dataptr = (const u8_t*)conn->current_msg->msg.w.vector->ptr + conn->current_msg->msg.w.vector_off;
diff = conn->current_msg->msg.w.vector->len - conn->current_msg->msg.w.vector_off;
if (diff > 0xffffUL) { /* max_u16_t */
len = 0xffff;
apiflags |= TCP_WRITE_FLAG_MORE;
} else {
len = (u16_t)diff;
}
}
LWIP_ASSERT("lwip_netconn_do_writemore: invalid length!",
((conn->current_msg->msg.w.offset + len) <= conn->current_msg->msg.w.len));
err = tcp_write(conn->pcb.tcp, dataptr, len, apiflags);
available = tcp_sndbuf(conn->pcb.tcp);
if (available < len) {
/* don't try to write more than sendbuf */
len = available;
if (dontblock) {
if (!len) {
/* set error according to partial write or not */
err = (conn->current_msg->msg.w.offset == 0) ? ERR_WOULDBLOCK : ERR_OK;
goto err_mem;
}
} else {
apiflags |= TCP_WRITE_FLAG_MORE;
}
}
LWIP_ASSERT("lwip_netconn_do_writemore: invalid length!",
((conn->current_msg->msg.w.vector_off + len) <= conn->current_msg->msg.w.vector->len));
/* we should loop around for more sending in the following cases:
1) We couldn't finish the current vector because of 16-bit size limitations.
tcp_write() and tcp_sndbuf() both are limited to 16-bit sizes
2) We are sending the remainder of the current vector and have more */
if ((len == 0xffff && diff > 0xffffUL) ||
(len == (u16_t)diff && conn->current_msg->msg.w.vector_cnt > 1)) {
write_more = 1;
apiflags |= TCP_WRITE_FLAG_MORE;
} else {
write_more = 0;
}
err = tcp_write(conn->pcb.tcp, dataptr, len, apiflags);
if (err == ERR_OK) {
conn->current_msg->msg.w.offset += len;
/* update write state if making another loop */
if (write_more) {
conn->current_msg->msg.w.vector_off += len;
/* check if current vector is finished */
if (conn->current_msg->msg.w.vector_off == conn->current_msg->msg.w.vector->len) {
conn->current_msg->msg.w.vector_cnt--;
/* if we have additional vectors, move on to them */
if (conn->current_msg->msg.w.vector_cnt > 0) {
conn->current_msg->msg.w.vector++;
conn->current_msg->msg.w.vector_off = 0;
}
}
}
}
} while (write_more && err == ERR_OK);
/* if OK or memory error, check available space */
if ((err == ERR_OK) || (err == ERR_MEM)) {
err_mem:
if (dontblock && (len < conn->current_msg->msg.w.len)) {
if (dontblock && (conn->current_msg->msg.w.offset < conn->current_msg->msg.w.len)) {
/* non-blocking write did not write everything: mark the pcb non-writable
and let poll_tcp check writable space to mark the pcb writable again */
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
API_EVENT(conn, NETCONN_EVT_SENDMINUS, 0);
conn->flags |= NETCONN_FLAG_CHECK_WRITESPACE;
} else if ((tcp_sndbuf(conn->pcb.tcp) <= TCP_SNDLOWAT) ||
(tcp_sndqueuelen(conn->pcb.tcp) >= TCP_SNDQUEUELOWAT)) {
/* The queued byte- or pbuf-count exceeds the configured low-water limit,
let select mark this pcb as non-writable. */
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
API_EVENT(conn, NETCONN_EVT_SENDMINUS, 0);
}
}
if (err == ERR_OK) {
err_t out_err;
conn->current_msg->msg.w.offset += len;
if ((conn->current_msg->msg.w.offset == conn->current_msg->msg.w.len) || dontblock) {
/* return sent length (caller reads length from msg.w.offset) */
write_finished = 1;
@ -1589,8 +1620,9 @@ err_mem:
err = out_err;
write_finished = 1;
} else if (dontblock) {
/* non-blocking write is done on ERR_MEM */
err = ERR_WOULDBLOCK;
/* non-blocking write is done on ERR_MEM, set error according
to partial write or not */
err = (conn->current_msg->msg.w.offset == 0) ? ERR_WOULDBLOCK : ERR_OK;
write_finished = 1;
}
} else {

View File

@ -1061,6 +1061,8 @@ lwip_sendmsg(int s, const struct msghdr *msg, int flags)
LWIP_ERROR("lwip_sendmsg: invalid msghdr", msg != NULL,
sock_set_errno(sock, err_to_errno(ERR_ARG)); return -1;);
LWIP_ERROR("lwip_sendmsg: maximum iovs exceeded", (msg->msg_iovlen <= IOV_MAX),
sock_set_errno(sock, err_to_errno(ERR_ARG)); return -1;);
LWIP_UNUSED_ARG(msg->msg_control);
LWIP_UNUSED_ARG(msg->msg_controllen);
@ -1074,32 +1076,11 @@ lwip_sendmsg(int s, const struct msghdr *msg, int flags)
((flags & MSG_MORE) ? NETCONN_MORE : 0) |
((flags & MSG_DONTWAIT) ? NETCONN_DONTBLOCK : 0);
for (i = 0; i < msg->msg_iovlen; i++) {
u8_t apiflags = write_flags;
if (i + 1 < msg->msg_iovlen) {
apiflags |= NETCONN_MORE;
}
written = 0;
err = netconn_write_partly(sock->conn, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len, write_flags, &written);
if (err == ERR_OK) {
size += written;
/* check that the entire IO vector was accepected, if not return a partial write */
if (written != msg->msg_iov[i].iov_len)
break;
}
/* none of this IO vector was accepted, but previous was, return partial write and conceal ERR_WOULDBLOCK */
else if (err == ERR_WOULDBLOCK && size > 0) {
err = ERR_OK;
/* let ERR_WOULDBLOCK persist on the netconn since we are returning ERR_OK */
break;
} else {
size = -1;
break;
}
}
written = 0;
err = netconn_write_vectors_partly(sock->conn, (struct netvector *)msg->msg_iov, (u16_t)msg->msg_iovlen, write_flags, &written);
sock_set_errno(sock, err_to_errno(err));
done_socket(sock);
return size;
return (err == ERR_OK ? (int)written : -1);
#else /* LWIP_TCP */
sock_set_errno(sock, err_to_errno(ERR_ARG));
done_socket(sock);

View File

@ -267,6 +267,11 @@ struct netconn {
netconn_callback callback;
};
struct netvector {
const void *ptr;
size_t len;
};
/** Register an Network connection event */
#define API_EVENT(c,e,l) if (c->callback) { \
(*c->callback)(c, e, l); \
@ -319,6 +324,8 @@ err_t netconn_sendto(struct netconn *conn, struct netbuf *buf,
err_t netconn_send(struct netconn *conn, struct netbuf *buf);
err_t netconn_write_partly(struct netconn *conn, const void *dataptr, size_t size,
u8_t apiflags, size_t *bytes_written);
err_t netconn_write_vectors_partly(struct netconn *conn, struct netvector *vectors, u16_t vectorcnt,
u8_t apiflags, size_t *bytes_written);
/** @ingroup netconn_tcp */
#define netconn_write(conn, dataptr, size, apiflags) \
netconn_write_partly(conn, dataptr, size, apiflags, NULL)

View File

@ -103,10 +103,15 @@ struct api_msg {
} ad;
/** used for lwip_netconn_do_write */
struct {
const void *dataptr;
/** total length of dataptr */
/** current vector to write */
const struct netvector *vector;
/** number of unwritten vectors */
u16_t vector_cnt;
/** offset into current vector */
size_t vector_off;
/** total length across vectors */
size_t len;
/** offset into dataptr/output of bytes written when err == ERR_OK */
/** offset into total length/output of bytes written when err == ERR_OK */
size_t offset;
u8_t apiflags;
#if LWIP_SO_SNDTIMEO

View File

@ -108,6 +108,12 @@ struct sockaddr_storage {
typedef u32_t socklen_t;
#endif
#if !defined IOV_MAX
#define IOV_MAX 0xFFFF
#elif IOV_MAX > 0xFFFF
#error "IOV_MAX larger than supported by LwIP"
#endif /* IOV_MAX */
#if !defined(iovec)
struct iovec {
void *iov_base;