From df0699c143842e656176dfe65d89e183e697ef53 Mon Sep 17 00:00:00 2001 From: David Girault Date: Wed, 30 Jan 2019 15:53:30 +0100 Subject: [PATCH] mqtt: support mostly zero-copy message analysis also ensure no parts of message are lost because cpy_len != buffer_space! --- src/apps/mqtt/mqtt.c | 45 ++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/apps/mqtt/mqtt.c b/src/apps/mqtt/mqtt.c index fab1da8a..ae3a7577 100644 --- a/src/apps/mqtt/mqtt.c +++ b/src/apps/mqtt/mqtt.c @@ -669,13 +669,11 @@ mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) * @param remaining_length Remaining length of complete message */ static mqtt_connection_status_t -mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length) +mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length, + u8_t *var_hdr_payload) { mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; - u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_len; - size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len; - /* Control packet type */ u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); u16_t pkt_id = 0; @@ -715,6 +713,7 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { /* Should have topic and pkt id*/ + size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len; u8_t *topic; u16_t after_topic; u8_t bkp; @@ -783,6 +782,10 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u } } } else { + if (length < 2) { + LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short message\n")); + goto out_disconnect; + } /* Get packet identifier */ pkt_id = (u16_t)var_hdr_payload[0] << 8; pkt_id |= (u16_t)var_hdr_payload[1]; @@ -869,7 +872,7 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); if (msg_rem_len == 0) { /* Complete message with no extra headers of payload received */ - mqtt_message_received(client, fixed_hdr_len, 0, 0); + mqtt_message_received(client, fixed_hdr_len, 0, 0, NULL); client->msg_idx = 0; fixed_hdr_len = 0; } else { @@ -882,6 +885,8 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) } else { /* Fixed header has been parsed, parse variable header */ u16_t cpy_len, buffer_space; + u8_t *var_hdr_payload; + mqtt_connection_status_t res; /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); @@ -891,7 +896,13 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) if (cpy_len > buffer_space) { cpy_len = buffer_space; } - pbuf_copy_partial(p, client->rx_buffer + fixed_hdr_len, cpy_len, in_offset); + /* Adjust cpy_len to ensure zero-copy operation for remaining parts of current message */ + if (client->msg_idx >= MQTT_VAR_HEADER_BUFFER_LEN) { + if (cpy_len > (p->len - in_offset)) + cpy_len = p->len - in_offset; + } + var_hdr_payload = (u8_t*)pbuf_get_contiguous(p, client->rx_buffer + fixed_hdr_len, + buffer_space, cpy_len, in_offset); /* Advance get and put indexes */ client->msg_idx += cpy_len; @@ -899,18 +910,16 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) msg_rem_len -= cpy_len; LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); - if ((msg_rem_len == 0) || (cpy_len == buffer_space)) { - /* Whole message received or buffer is full */ - mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len); - if (res != MQTT_CONNECT_ACCEPTED) { - return res; - } - if (msg_rem_len == 0) { - /* Reset parser state */ - client->msg_idx = 0; - /* msg_tot_len = 0; */ - fixed_hdr_len = 0; - } + /* Whole or partial message received */ + res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len, var_hdr_payload); + if (res != MQTT_CONNECT_ACCEPTED) { + return res; + } + if (msg_rem_len == 0) { + /* Reset parser state */ + client->msg_idx = 0; + /* msg_tot_len = 0; */ + fixed_hdr_len = 0; } } }