mqtt: remove bad assert in mqtt_message_received()

- client->msg_idx can be > MQTT_VAR_HEADER_BUFFER_LEN in long message splitted in multiple pbufs
- renamed fixed_hdr_idx to fixed_hdr_len because it is length of fixed header in rx_buffer, not an index to it
- removed the cpy_start as data always copied right after the fixed header
This commit is contained in:
David Girault 2019-01-30 11:36:51 +01:00 committed by Simon Goldschmidt
parent ed561a578b
commit 2cc420e434

View File

@ -664,25 +664,24 @@ mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
/** /**
* Complete MQTT message received or buffer full * Complete MQTT message received or buffer full
* @param client MQTT client * @param client MQTT client
* @param fixed_hdr_idx header index * @param fixed_hdr_len length of fixed header
* @param length length received part * @param length length received part
* @param remaining_length Remaining length of complete message * @param remaining_length Remaining length of complete message
*/ */
static mqtt_connection_status_t static mqtt_connection_status_t
mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length)
{ {
mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_len;
size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx; size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len;
/* Control packet type */ /* Control packet type */
u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
u16_t pkt_id = 0; u16_t pkt_id = 0;
LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN); LWIP_ASSERT("fixed_hdr_len <= client->msg_idx", fixed_hdr_len <= client->msg_idx);
LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx); LWIP_ERROR("buffer length mismatch", fixed_hdr_len + length <= MQTT_VAR_HEADER_BUFFER_LEN,
LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN,
return MQTT_CONNECT_DISCONNECTED); return MQTT_CONNECT_DISCONNECTED);
if (pkt_type == MQTT_MSG_TYPE_CONNACK) { if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
@ -840,61 +839,59 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
{ {
u16_t in_offset = 0; u16_t in_offset = 0;
u32_t msg_rem_len = 0; u32_t msg_rem_len = 0;
u8_t fixed_hdr_idx = 0; u8_t fixed_hdr_len = 0;
u8_t b = 0; u8_t b = 0;
while (p->tot_len > in_offset) { while (p->tot_len > in_offset) {
/* We ALWAYS parse the header here first. Even if the header was not /* We ALWAYS parse the header here first. Even if the header was not
included in this segment, we re-parse it here by buffering it in included in this segment, we re-parse it here by buffering it in
client->rx_buffer. client->msg_idx keeps track of this. */ client->rx_buffer. client->msg_idx keeps track of this. */
if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { if ((fixed_hdr_len < 2) || ((b & 0x80) != 0)) {
if (fixed_hdr_idx < client->msg_idx) { if (fixed_hdr_len < client->msg_idx) {
/* parse header from old pbuf (buffered in client->rx_buffer) */ /* parse header from old pbuf (buffered in client->rx_buffer) */
b = client->rx_buffer[fixed_hdr_idx]; b = client->rx_buffer[fixed_hdr_len];
} else { } else {
/* parse header from this pbuf and save it in client->rx_buffer in case /* parse header from this pbuf and save it in client->rx_buffer in case
it comes in segmented */ it comes in segmented */
b = pbuf_get_at(p, in_offset++); b = pbuf_get_at(p, in_offset++);
client->rx_buffer[client->msg_idx++] = b; client->rx_buffer[client->msg_idx++] = b;
} }
fixed_hdr_idx++; fixed_hdr_len++;
if (fixed_hdr_idx >= 2) { if (fixed_hdr_len >= 2) {
/* fixed header contains at least 2 bytes but can contain more, depending on /* fixed header contains at least 2 bytes but can contain more, depending on
'remaining length'. All bytes but the last of this have 0x80 set to 'remaining length'. All bytes but the last of this have 0x80 set to
indicate more bytes are coming. */ indicate more bytes are coming. */
msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_len - 2) * 7);
if ((b & 0x80) == 0) { if ((b & 0x80) == 0) {
/* fixed header is done */ /* fixed header is done */
LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
if (msg_rem_len == 0) { if (msg_rem_len == 0) {
/* Complete message with no extra headers of payload received */ /* Complete message with no extra headers of payload received */
mqtt_message_received(client, fixed_hdr_idx, 0, 0); mqtt_message_received(client, fixed_hdr_len, 0, 0);
client->msg_idx = 0; client->msg_idx = 0;
fixed_hdr_idx = 0; fixed_hdr_len = 0;
} else { } else {
/* Bytes remaining in message (changes remaining length if this is /* Bytes remaining in message (changes remaining length if this is
not the first segment of this message) */ not the first segment of this message) */
msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; msg_rem_len = (msg_rem_len + fixed_hdr_len) - client->msg_idx;
} }
} }
} }
} else { } else {
/* Fixed header has been parsed, parse variable header */ /* Fixed header has been parsed, parse variable header */
u16_t cpy_len, cpy_start, buffer_space; u16_t cpy_len, buffer_space;
cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
/* Allow to copy the lesser one of available length in input data or bytes remaining in message */ /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
/* Limit to available space in buffer */ /* Limit to available space in buffer */
buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_len;
if (cpy_len > buffer_space) { if (cpy_len > buffer_space) {
cpy_len = buffer_space; cpy_len = buffer_space;
} }
pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset); pbuf_copy_partial(p, client->rx_buffer + fixed_hdr_len, cpy_len, in_offset);
/* Advance get and put indexes */ /* Advance get and put indexes */
client->msg_idx += cpy_len; client->msg_idx += cpy_len;
@ -904,7 +901,7 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
if ((msg_rem_len == 0) || (cpy_len == buffer_space)) { if ((msg_rem_len == 0) || (cpy_len == buffer_space)) {
/* Whole message received or buffer is full */ /* Whole message received or buffer is full */
mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len);
if (res != MQTT_CONNECT_ACCEPTED) { if (res != MQTT_CONNECT_ACCEPTED) {
return res; return res;
} }
@ -912,7 +909,7 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
/* Reset parser state */ /* Reset parser state */
client->msg_idx = 0; client->msg_idx = 0;
/* msg_tot_len = 0; */ /* msg_tot_len = 0; */
fixed_hdr_idx = 0; fixed_hdr_len = 0;
} }
} }
} }