diff --git a/src/apps/mqtt/mqtt.c b/src/apps/mqtt/mqtt.c index e36e33a4..5addb2fc 100644 --- a/src/apps/mqtt/mqtt.c +++ b/src/apps/mqtt/mqtt.c @@ -2,6 +2,8 @@ * @file * MQTT client * + * @defgroup mqtt MQTT Client + * @ingroup apps */ /* @@ -51,7 +53,7 @@ #include "lwip/err.h" #include "lwip/pbuf.h" #include "lwip/tcp.h" -#include "mqtt.h" +#include "lwip/apps/mqtt.h" /** * MQTT_DEBUG: Default is off. @@ -139,7 +141,7 @@ enum mqtt_connect_flag { }; -#if (MQTT_DEBUG == LWIP_DBG_ON) && defined(LWIP_DEBUG) +#if defined(LWIP_DEBUG) static const char * const mqtt_message_type_str[15] = { "UNDEFINED", @@ -223,11 +225,11 @@ msg_generate_packet_id(mqtt_client_t *client) static void mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb) { - LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); err_t err; u8_t wrap = 0; u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); u16_t send_len = tcp_sndbuf(tpcb); + LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); if(send_len == 0 || ringbuf_lin_len == 0) { return; @@ -275,9 +277,9 @@ mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb) static struct mqtt_request_t * mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) { - LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); struct mqtt_request_t *r = NULL; u8_t n; + LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT && r == NULL; n++) { /* Item point to itself if not in use */ if(r_objs[n].next == &r_objs[n]) { @@ -300,12 +302,12 @@ mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb static void mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) { - LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); - struct mqtt_request_t *head = NULL; s16_t time_before = 0; struct mqtt_request_t *iter = *tail; + LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); + /* Iterate trough queue to find head, and count total timeout time */ for(iter = *tail; iter != NULL; iter = iter->next) { time_before += iter->timeout_diff; @@ -343,8 +345,8 @@ mqtt_delete_request(struct mqtt_request_t *r) static struct mqtt_request_t * mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) { - LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); struct mqtt_request_t *iter = NULL, *prev = NULL; + LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); /* Search all request for pkt_id */ for(iter = *tail; iter != NULL; iter = iter->next) { if(iter->pkt_id == pkt_id) { @@ -378,8 +380,8 @@ mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) static void mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) { - LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); struct mqtt_request_t *r = *tail; + LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); while(t > 0 && r != NULL) { if(t >= r->timeout_diff) { t -= r->timeout_diff; @@ -406,8 +408,8 @@ mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) static void mqtt_clear_requests(struct mqtt_request_t **tail) { - LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); struct mqtt_request_t *iter, *next; + LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); for(iter = *tail; iter != NULL; iter = next) { next = iter->next; mqtt_delete_request(iter); @@ -421,8 +423,8 @@ mqtt_clear_requests(struct mqtt_request_t **tail) static void mqtt_init_requests(struct mqtt_request_t *r_objs) { - LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); u8_t n; + LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) { /* Item pointing to itself indicates unused */ r_objs[n].next = &r_objs[n]; @@ -442,7 +444,6 @@ mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) static void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) { - mqtt_ringbuf_put(rb, value >> 8); mqtt_ringbuf_put(rb, value & 0xff); } @@ -459,9 +460,9 @@ mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length static void mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) { + u16_t n; mqtt_ringbuf_put(rb, length >> 8); mqtt_ringbuf_put(rb, length & 0xff); - u16_t n; for(n=0; n < length; n++) { mqtt_ringbuf_put(rb, str[n]); } @@ -481,7 +482,6 @@ static void mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup, u8_t qos, u8_t retain, u16_t r_length) { - /* Start with control byte */ mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1))); /* Encode remaining length field */ @@ -501,11 +501,11 @@ mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t d static u8_t mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) { - LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); - /* Start with length of type byte + remaining length */ u16_t total_len = 1 + r_length; + LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); + /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ do { total_len++; @@ -670,7 +670,7 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u if(pkt_type == MQTT_MSG_TYPE_CONNACK) { if(client->conn_state == MQTT_CONNECTING) { /* Get result code from CONNACK */ - res = var_hdr_payload[1]; + res = (mqtt_connection_status_t)var_hdr_payload[1]; LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res)); if(res == MQTT_CONNECT_ACCEPTED) { /* Reset cyclic_tick when changing to connected state */ @@ -918,9 +918,10 @@ mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) static err_t mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len) { + mqtt_client_t *client = (mqtt_client_t *)arg; + LWIP_UNUSED_ARG(tpcb); LWIP_UNUSED_ARG(len); - mqtt_client_t *client = (mqtt_client_t *)arg; if(client->conn_state == MQTT_CONNECTED) { @@ -984,11 +985,12 @@ mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb) static err_t mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err) { + mqtt_client_t* client = (mqtt_client_t *)arg; + if(err != ERR_OK) { LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); return err; } - mqtt_client_t* client = (mqtt_client_t *)arg; /* Initiate receiver state */ client->msg_idx = 0; @@ -1025,6 +1027,7 @@ mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err) * @param payload Data to publish (NULL is allowed) * @param payload_length: Length of payload (0 is allowed) * @param qos Quality of service, 0 1 or 2 + * @param retain MQTT retain flag * @param cb Callback to call when publish is complete or has timed out * @param arg User supplied argument to publish callback * @return ERR_OK if successful @@ -1035,17 +1038,17 @@ err_t mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, mqtt_request_cb_t cb, void *arg) { + struct mqtt_request_t *r; + u16_t pkt_id; + u16_t topic_len = strlen(topic); + u16_t remaining_length = 2 + topic_len + payload_length; + LWIP_ASSERT("mqtt_publish: client != NULL", client); LWIP_ASSERT("mqtt_publish: topic != NULL", topic); LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); - struct mqtt_request_t *r; - u16_t pkt_id; - u16_t topic_len = strlen(topic); - u16_t remaining_length = 2 + topic_len + payload_length; - if(qos > 0) { remaining_length += 2; /* Generate pkt_id id for QoS1 and 2 */ @@ -1099,6 +1102,13 @@ mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_ err_t mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) { + u16_t topic_len = strlen(topic); + /* Topic string, pkt_id, qos for subscribe */ + u16_t remaining_length = topic_len + 2 + 2 + (sub != 0); + + u16_t pkt_id = msg_generate_packet_id(client); + struct mqtt_request_t *r = mqtt_create_request(client->req_list, pkt_id, cb, arg); + LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); @@ -1107,12 +1117,6 @@ mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_ return ERR_CONN; } - u16_t topic_len = strlen(topic); - /* Topic string, pkt_id, qos for subscribe */ - u16_t remaining_length = topic_len + 2 + 2 + (sub != 0); - - u16_t pkt_id = msg_generate_packet_id(client); - struct mqtt_request_t *r = mqtt_create_request(client->req_list, pkt_id, cb, arg); if(r == NULL) { return ERR_MEM; } @@ -1159,7 +1163,6 @@ mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb /** * Create a new MQTT client instance - * @param client MQTT client * @return Pointer to instance on success, NULL otherwise */ mqtt_client_t * @@ -1186,6 +1189,12 @@ err_t mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_t cb, void *arg, const struct mqtt_connect_client_info_t *client_info) { + err_t err; + ip_addr_t ip_addr; + u16_t port = 1883; + /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ + u16_t remaining_length = 2 + 4 + 1 + 1 + 2; + u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); LWIP_ASSERT("mqtt_client_connect: host != NULL", host != NULL); @@ -1196,9 +1205,6 @@ mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_ LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n")); return ERR_ISCONN; } - err_t err; - ip_addr_t ip_addr; - u16_t port = 1883; if(ipaddr_aton(host, &ip_addr) == 0) { LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Illegal hostname: %s\n", host)); @@ -1212,10 +1218,6 @@ mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_ mqtt_init_requests(client->req_list); /* Build connect message */ - /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ - u16_t remaining_length = 2 + 4 + 1 + 1 + 2; - u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; - if(client_info->will_topic != NULL && client_info->will_msg != NULL) { flags |= MQTT_CONNECT_FLAG_WILL; flags |= (client_info->will_qos & 3) << 3; @@ -1284,7 +1286,6 @@ tcp_fail: tcp_abort(client->conn); client->conn = NULL; return err; - } @@ -1300,7 +1301,7 @@ mqtt_disconnect(mqtt_client_t *client) if(client->conn_state != TCP_DISCONNECTED) { /* Set conn_state before calling mqtt_close to prevent callback from being called */ client->conn_state = TCP_DISCONNECTED; - mqtt_close(client, 0); + mqtt_close(client, (mqtt_connection_status_t)0); } } diff --git a/src/include/lwip/apps/mqtt.h b/src/include/lwip/apps/mqtt.h index 3b27bb08..e5cd3660 100644 --- a/src/include/lwip/apps/mqtt.h +++ b/src/include/lwip/apps/mqtt.h @@ -44,6 +44,10 @@ extern "C" { #endif +/** @ingroup mqtt + * @{ + */ + /** * Output ring-buffer size, must be able to fit largest outgoing publish message topic+payloads */ @@ -173,39 +177,42 @@ struct mqtt_request_t u16_t timeout_diff; }; +/** Ring buffer */ +struct mqtt_ringbuf_t { + u16_t put; + u16_t get; + u8_t buf[MQTT_OUTPUT_RINGBUF_SIZE]; +}; + /** MQTT client */ struct mqtt_client_t { - /** Timers and timeouts */ - u16_t cyclic_tick; + /** Timers and timeouts */ + u16_t cyclic_tick; u16_t keep_alive; u16_t server_watchdog; - /** Packet identifier generator*/ - u16_t pkt_id_seq; + /** Packet identifier generator*/ + u16_t pkt_id_seq; /** Packet identifier of pending incoming publish */ u16_t inpub_pkt_id; /** Connection state */ u8_t conn_state; struct tcp_pcb *conn; - /** Connection callback */ - void *connect_arg; - mqtt_connection_cb_t connect_cb; + /** Connection callback */ + void *connect_arg; + mqtt_connection_cb_t connect_cb; /** Pending requests to server */ struct mqtt_request_t *pend_req_queue; struct mqtt_request_t req_list[MQTT_REQ_MAX_IN_FLIGHT]; - void *inpub_arg; - /** Incoming data callback */ - mqtt_incoming_data_cb_t data_cb; + void *inpub_arg; + /** Incoming data callback */ + mqtt_incoming_data_cb_t data_cb; mqtt_incoming_publish_cb_t pub_cb; /** Input */ u32_t msg_idx; u8_t rx_buffer[MQTT_VAR_HEADER_BUFFER_LEN]; - /** Output ring-buffer */ - struct mqtt_ringbuf_t { - u16_t put; - u16_t get; - u8_t buf[MQTT_OUTPUT_RINGBUF_SIZE]; - } output; + /** Output ring-buffer */ + struct mqtt_ringbuf_t output; }; @@ -239,6 +246,10 @@ err_t mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_re err_t mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, mqtt_request_cb_t cb, void *arg); +/** + * @} + */ + #ifdef __cplusplus } #endif