mqtt: support mostly zero-copy message analysis

also ensure no parts of message are lost because cpy_len != buffer_space!
This commit is contained in:
David Girault 2019-01-30 15:53:30 +01:00 committed by Dirk Ziegelmeier
parent 156bb74d17
commit df0699c143

View File

@ -669,13 +669,11 @@ mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
* @param remaining_length Remaining length of complete message * @param remaining_length Remaining length of complete message
*/ */
static mqtt_connection_status_t static mqtt_connection_status_t
mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length) mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length,
u8_t *var_hdr_payload)
{ {
mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_len;
size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len;
/* Control packet type */ /* Control packet type */
u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
u16_t pkt_id = 0; u16_t pkt_id = 0;
@ -715,6 +713,7 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u
if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
/* Should have topic and pkt id*/ /* Should have topic and pkt id*/
size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len;
u8_t *topic; u8_t *topic;
u16_t after_topic; u16_t after_topic;
u8_t bkp; u8_t bkp;
@ -783,6 +782,10 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u
} }
} }
} else { } else {
if (length < 2) {
LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short message\n"));
goto out_disconnect;
}
/* Get packet identifier */ /* Get packet identifier */
pkt_id = (u16_t)var_hdr_payload[0] << 8; pkt_id = (u16_t)var_hdr_payload[0] << 8;
pkt_id |= (u16_t)var_hdr_payload[1]; pkt_id |= (u16_t)var_hdr_payload[1];
@ -869,7 +872,7 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
if (msg_rem_len == 0) { if (msg_rem_len == 0) {
/* Complete message with no extra headers of payload received */ /* Complete message with no extra headers of payload received */
mqtt_message_received(client, fixed_hdr_len, 0, 0); mqtt_message_received(client, fixed_hdr_len, 0, 0, NULL);
client->msg_idx = 0; client->msg_idx = 0;
fixed_hdr_len = 0; fixed_hdr_len = 0;
} else { } else {
@ -882,6 +885,8 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
} else { } else {
/* Fixed header has been parsed, parse variable header */ /* Fixed header has been parsed, parse variable header */
u16_t cpy_len, buffer_space; u16_t cpy_len, buffer_space;
u8_t *var_hdr_payload;
mqtt_connection_status_t res;
/* Allow to copy the lesser one of available length in input data or bytes remaining in message */ /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
@ -891,7 +896,13 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
if (cpy_len > buffer_space) { if (cpy_len > buffer_space) {
cpy_len = buffer_space; cpy_len = buffer_space;
} }
pbuf_copy_partial(p, client->rx_buffer + fixed_hdr_len, cpy_len, in_offset); /* Adjust cpy_len to ensure zero-copy operation for remaining parts of current message */
if (client->msg_idx >= MQTT_VAR_HEADER_BUFFER_LEN) {
if (cpy_len > (p->len - in_offset))
cpy_len = p->len - in_offset;
}
var_hdr_payload = (u8_t*)pbuf_get_contiguous(p, client->rx_buffer + fixed_hdr_len,
buffer_space, cpy_len, in_offset);
/* Advance get and put indexes */ /* Advance get and put indexes */
client->msg_idx += cpy_len; client->msg_idx += cpy_len;
@ -899,18 +910,16 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
msg_rem_len -= cpy_len; msg_rem_len -= cpy_len;
LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
if ((msg_rem_len == 0) || (cpy_len == buffer_space)) { /* Whole or partial message received */
/* Whole message received or buffer is full */ res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len, var_hdr_payload);
mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len); if (res != MQTT_CONNECT_ACCEPTED) {
if (res != MQTT_CONNECT_ACCEPTED) { return res;
return res; }
} if (msg_rem_len == 0) {
if (msg_rem_len == 0) { /* Reset parser state */
/* Reset parser state */ client->msg_idx = 0;
client->msg_idx = 0; /* msg_tot_len = 0; */
/* msg_tot_len = 0; */ fixed_hdr_len = 0;
fixed_hdr_len = 0;
}
} }
} }
} }