mirror of
https://github.com/lwip-tcpip/lwip.git
synced 2025-04-16 08:43:17 +00:00
Some cleanups in MQTT client
Integrate in documentation Compile fixes, mostly: Variables must be declared before any statement in a function
This commit is contained in:
parent
1e82465766
commit
14e36866f5
@ -2,6 +2,8 @@
|
|||||||
* @file
|
* @file
|
||||||
* MQTT client
|
* MQTT client
|
||||||
*
|
*
|
||||||
|
* @defgroup mqtt MQTT Client
|
||||||
|
* @ingroup apps
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -51,7 +53,7 @@
|
|||||||
#include "lwip/err.h"
|
#include "lwip/err.h"
|
||||||
#include "lwip/pbuf.h"
|
#include "lwip/pbuf.h"
|
||||||
#include "lwip/tcp.h"
|
#include "lwip/tcp.h"
|
||||||
#include "mqtt.h"
|
#include "lwip/apps/mqtt.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MQTT_DEBUG: Default is off.
|
* MQTT_DEBUG: Default is off.
|
||||||
@ -139,7 +141,7 @@ enum mqtt_connect_flag {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
#if (MQTT_DEBUG == LWIP_DBG_ON) && defined(LWIP_DEBUG)
|
#if defined(LWIP_DEBUG)
|
||||||
static const char * const mqtt_message_type_str[15] =
|
static const char * const mqtt_message_type_str[15] =
|
||||||
{
|
{
|
||||||
"UNDEFINED",
|
"UNDEFINED",
|
||||||
@ -223,11 +225,11 @@ msg_generate_packet_id(mqtt_client_t *client)
|
|||||||
static void
|
static void
|
||||||
mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
|
mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
|
|
||||||
err_t err;
|
err_t err;
|
||||||
u8_t wrap = 0;
|
u8_t wrap = 0;
|
||||||
u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
|
u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
|
||||||
u16_t send_len = tcp_sndbuf(tpcb);
|
u16_t send_len = tcp_sndbuf(tpcb);
|
||||||
|
LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
|
||||||
|
|
||||||
if(send_len == 0 || ringbuf_lin_len == 0) {
|
if(send_len == 0 || ringbuf_lin_len == 0) {
|
||||||
return;
|
return;
|
||||||
@ -275,9 +277,9 @@ mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
|
|||||||
static struct mqtt_request_t *
|
static struct mqtt_request_t *
|
||||||
mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
|
mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
|
|
||||||
struct mqtt_request_t *r = NULL;
|
struct mqtt_request_t *r = NULL;
|
||||||
u8_t n;
|
u8_t n;
|
||||||
|
LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
|
||||||
for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT && r == NULL; n++) {
|
for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT && r == NULL; n++) {
|
||||||
/* Item point to itself if not in use */
|
/* Item point to itself if not in use */
|
||||||
if(r_objs[n].next == &r_objs[n]) {
|
if(r_objs[n].next == &r_objs[n]) {
|
||||||
@ -300,12 +302,12 @@ mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb
|
|||||||
static void
|
static void
|
||||||
mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
|
mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
|
|
||||||
|
|
||||||
struct mqtt_request_t *head = NULL;
|
struct mqtt_request_t *head = NULL;
|
||||||
s16_t time_before = 0;
|
s16_t time_before = 0;
|
||||||
struct mqtt_request_t *iter = *tail;
|
struct mqtt_request_t *iter = *tail;
|
||||||
|
|
||||||
|
LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
|
||||||
|
|
||||||
/* Iterate trough queue to find head, and count total timeout time */
|
/* Iterate trough queue to find head, and count total timeout time */
|
||||||
for(iter = *tail; iter != NULL; iter = iter->next) {
|
for(iter = *tail; iter != NULL; iter = iter->next) {
|
||||||
time_before += iter->timeout_diff;
|
time_before += iter->timeout_diff;
|
||||||
@ -343,8 +345,8 @@ mqtt_delete_request(struct mqtt_request_t *r)
|
|||||||
static struct mqtt_request_t *
|
static struct mqtt_request_t *
|
||||||
mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
|
mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
|
|
||||||
struct mqtt_request_t *iter = NULL, *prev = NULL;
|
struct mqtt_request_t *iter = NULL, *prev = NULL;
|
||||||
|
LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
|
||||||
/* Search all request for pkt_id */
|
/* Search all request for pkt_id */
|
||||||
for(iter = *tail; iter != NULL; iter = iter->next) {
|
for(iter = *tail; iter != NULL; iter = iter->next) {
|
||||||
if(iter->pkt_id == pkt_id) {
|
if(iter->pkt_id == pkt_id) {
|
||||||
@ -378,8 +380,8 @@ mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
|
|||||||
static void
|
static void
|
||||||
mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
|
mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
|
|
||||||
struct mqtt_request_t *r = *tail;
|
struct mqtt_request_t *r = *tail;
|
||||||
|
LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
|
||||||
while(t > 0 && r != NULL) {
|
while(t > 0 && r != NULL) {
|
||||||
if(t >= r->timeout_diff) {
|
if(t >= r->timeout_diff) {
|
||||||
t -= r->timeout_diff;
|
t -= r->timeout_diff;
|
||||||
@ -406,8 +408,8 @@ mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
|
|||||||
static void
|
static void
|
||||||
mqtt_clear_requests(struct mqtt_request_t **tail)
|
mqtt_clear_requests(struct mqtt_request_t **tail)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
|
|
||||||
struct mqtt_request_t *iter, *next;
|
struct mqtt_request_t *iter, *next;
|
||||||
|
LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
|
||||||
for(iter = *tail; iter != NULL; iter = next) {
|
for(iter = *tail; iter != NULL; iter = next) {
|
||||||
next = iter->next;
|
next = iter->next;
|
||||||
mqtt_delete_request(iter);
|
mqtt_delete_request(iter);
|
||||||
@ -421,8 +423,8 @@ mqtt_clear_requests(struct mqtt_request_t **tail)
|
|||||||
static void
|
static void
|
||||||
mqtt_init_requests(struct mqtt_request_t *r_objs)
|
mqtt_init_requests(struct mqtt_request_t *r_objs)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
|
|
||||||
u8_t n;
|
u8_t n;
|
||||||
|
LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
|
||||||
for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
|
for(n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
|
||||||
/* Item pointing to itself indicates unused */
|
/* Item pointing to itself indicates unused */
|
||||||
r_objs[n].next = &r_objs[n];
|
r_objs[n].next = &r_objs[n];
|
||||||
@ -442,7 +444,6 @@ mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
|
|||||||
static
|
static
|
||||||
void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
|
void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
|
||||||
{
|
{
|
||||||
|
|
||||||
mqtt_ringbuf_put(rb, value >> 8);
|
mqtt_ringbuf_put(rb, value >> 8);
|
||||||
mqtt_ringbuf_put(rb, value & 0xff);
|
mqtt_ringbuf_put(rb, value & 0xff);
|
||||||
}
|
}
|
||||||
@ -459,9 +460,9 @@ mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length
|
|||||||
static void
|
static void
|
||||||
mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
|
mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
|
||||||
{
|
{
|
||||||
|
u16_t n;
|
||||||
mqtt_ringbuf_put(rb, length >> 8);
|
mqtt_ringbuf_put(rb, length >> 8);
|
||||||
mqtt_ringbuf_put(rb, length & 0xff);
|
mqtt_ringbuf_put(rb, length & 0xff);
|
||||||
u16_t n;
|
|
||||||
for(n=0; n < length; n++) {
|
for(n=0; n < length; n++) {
|
||||||
mqtt_ringbuf_put(rb, str[n]);
|
mqtt_ringbuf_put(rb, str[n]);
|
||||||
}
|
}
|
||||||
@ -481,7 +482,6 @@ static void
|
|||||||
mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
|
mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
|
||||||
u8_t qos, u8_t retain, u16_t r_length)
|
u8_t qos, u8_t retain, u16_t r_length)
|
||||||
{
|
{
|
||||||
|
|
||||||
/* Start with control byte */
|
/* Start with control byte */
|
||||||
mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
|
mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
|
||||||
/* Encode remaining length field */
|
/* Encode remaining length field */
|
||||||
@ -501,11 +501,11 @@ mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t d
|
|||||||
static u8_t
|
static u8_t
|
||||||
mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
|
mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
|
||||||
{
|
{
|
||||||
LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
|
|
||||||
|
|
||||||
/* Start with length of type byte + remaining length */
|
/* Start with length of type byte + remaining length */
|
||||||
u16_t total_len = 1 + r_length;
|
u16_t total_len = 1 + r_length;
|
||||||
|
|
||||||
|
LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
|
||||||
|
|
||||||
/* Calculate number of required bytes to contain the remaining bytes field and add to total*/
|
/* Calculate number of required bytes to contain the remaining bytes field and add to total*/
|
||||||
do {
|
do {
|
||||||
total_len++;
|
total_len++;
|
||||||
@ -670,7 +670,7 @@ mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u
|
|||||||
if(pkt_type == MQTT_MSG_TYPE_CONNACK) {
|
if(pkt_type == MQTT_MSG_TYPE_CONNACK) {
|
||||||
if(client->conn_state == MQTT_CONNECTING) {
|
if(client->conn_state == MQTT_CONNECTING) {
|
||||||
/* Get result code from CONNACK */
|
/* Get result code from CONNACK */
|
||||||
res = var_hdr_payload[1];
|
res = (mqtt_connection_status_t)var_hdr_payload[1];
|
||||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
|
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
|
||||||
if(res == MQTT_CONNECT_ACCEPTED) {
|
if(res == MQTT_CONNECT_ACCEPTED) {
|
||||||
/* Reset cyclic_tick when changing to connected state */
|
/* Reset cyclic_tick when changing to connected state */
|
||||||
@ -918,9 +918,10 @@ mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
|
|||||||
static err_t
|
static err_t
|
||||||
mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
|
mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
|
||||||
{
|
{
|
||||||
|
mqtt_client_t *client = (mqtt_client_t *)arg;
|
||||||
|
|
||||||
LWIP_UNUSED_ARG(tpcb);
|
LWIP_UNUSED_ARG(tpcb);
|
||||||
LWIP_UNUSED_ARG(len);
|
LWIP_UNUSED_ARG(len);
|
||||||
mqtt_client_t *client = (mqtt_client_t *)arg;
|
|
||||||
|
|
||||||
if(client->conn_state == MQTT_CONNECTED) {
|
if(client->conn_state == MQTT_CONNECTED) {
|
||||||
|
|
||||||
@ -984,11 +985,12 @@ mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
|
|||||||
static err_t
|
static err_t
|
||||||
mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
|
mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
|
||||||
{
|
{
|
||||||
|
mqtt_client_t* client = (mqtt_client_t *)arg;
|
||||||
|
|
||||||
if(err != ERR_OK) {
|
if(err != ERR_OK) {
|
||||||
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
|
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
mqtt_client_t* client = (mqtt_client_t *)arg;
|
|
||||||
|
|
||||||
/* Initiate receiver state */
|
/* Initiate receiver state */
|
||||||
client->msg_idx = 0;
|
client->msg_idx = 0;
|
||||||
@ -1025,6 +1027,7 @@ mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
|
|||||||
* @param payload Data to publish (NULL is allowed)
|
* @param payload Data to publish (NULL is allowed)
|
||||||
* @param payload_length: Length of payload (0 is allowed)
|
* @param payload_length: Length of payload (0 is allowed)
|
||||||
* @param qos Quality of service, 0 1 or 2
|
* @param qos Quality of service, 0 1 or 2
|
||||||
|
* @param retain MQTT retain flag
|
||||||
* @param cb Callback to call when publish is complete or has timed out
|
* @param cb Callback to call when publish is complete or has timed out
|
||||||
* @param arg User supplied argument to publish callback
|
* @param arg User supplied argument to publish callback
|
||||||
* @return ERR_OK if successful
|
* @return ERR_OK if successful
|
||||||
@ -1035,17 +1038,17 @@ err_t
|
|||||||
mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
|
mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
|
||||||
mqtt_request_cb_t cb, void *arg)
|
mqtt_request_cb_t cb, void *arg)
|
||||||
{
|
{
|
||||||
|
struct mqtt_request_t *r;
|
||||||
|
u16_t pkt_id;
|
||||||
|
u16_t topic_len = strlen(topic);
|
||||||
|
u16_t remaining_length = 2 + topic_len + payload_length;
|
||||||
|
|
||||||
LWIP_ASSERT("mqtt_publish: client != NULL", client);
|
LWIP_ASSERT("mqtt_publish: client != NULL", client);
|
||||||
LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
|
LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
|
||||||
LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
|
LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
|
||||||
|
|
||||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
|
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
|
||||||
|
|
||||||
struct mqtt_request_t *r;
|
|
||||||
u16_t pkt_id;
|
|
||||||
u16_t topic_len = strlen(topic);
|
|
||||||
u16_t remaining_length = 2 + topic_len + payload_length;
|
|
||||||
|
|
||||||
if(qos > 0) {
|
if(qos > 0) {
|
||||||
remaining_length += 2;
|
remaining_length += 2;
|
||||||
/* Generate pkt_id id for QoS1 and 2 */
|
/* Generate pkt_id id for QoS1 and 2 */
|
||||||
@ -1099,6 +1102,13 @@ mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_
|
|||||||
err_t
|
err_t
|
||||||
mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
|
mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
|
||||||
{
|
{
|
||||||
|
u16_t topic_len = strlen(topic);
|
||||||
|
/* Topic string, pkt_id, qos for subscribe */
|
||||||
|
u16_t remaining_length = topic_len + 2 + 2 + (sub != 0);
|
||||||
|
|
||||||
|
u16_t pkt_id = msg_generate_packet_id(client);
|
||||||
|
struct mqtt_request_t *r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
|
||||||
|
|
||||||
LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
|
LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
|
||||||
LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
|
LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
|
||||||
LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
|
LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
|
||||||
@ -1107,12 +1117,6 @@ mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_
|
|||||||
return ERR_CONN;
|
return ERR_CONN;
|
||||||
}
|
}
|
||||||
|
|
||||||
u16_t topic_len = strlen(topic);
|
|
||||||
/* Topic string, pkt_id, qos for subscribe */
|
|
||||||
u16_t remaining_length = topic_len + 2 + 2 + (sub != 0);
|
|
||||||
|
|
||||||
u16_t pkt_id = msg_generate_packet_id(client);
|
|
||||||
struct mqtt_request_t *r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
|
|
||||||
if(r == NULL) {
|
if(r == NULL) {
|
||||||
return ERR_MEM;
|
return ERR_MEM;
|
||||||
}
|
}
|
||||||
@ -1159,7 +1163,6 @@ mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new MQTT client instance
|
* Create a new MQTT client instance
|
||||||
* @param client MQTT client
|
|
||||||
* @return Pointer to instance on success, NULL otherwise
|
* @return Pointer to instance on success, NULL otherwise
|
||||||
*/
|
*/
|
||||||
mqtt_client_t *
|
mqtt_client_t *
|
||||||
@ -1186,6 +1189,12 @@ err_t
|
|||||||
mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_t cb, void *arg,
|
mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_t cb, void *arg,
|
||||||
const struct mqtt_connect_client_info_t *client_info)
|
const struct mqtt_connect_client_info_t *client_info)
|
||||||
{
|
{
|
||||||
|
err_t err;
|
||||||
|
ip_addr_t ip_addr;
|
||||||
|
u16_t port = 1883;
|
||||||
|
/* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
|
||||||
|
u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
|
||||||
|
u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
|
||||||
|
|
||||||
LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
|
LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
|
||||||
LWIP_ASSERT("mqtt_client_connect: host != NULL", host != NULL);
|
LWIP_ASSERT("mqtt_client_connect: host != NULL", host != NULL);
|
||||||
@ -1196,9 +1205,6 @@ mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_
|
|||||||
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
|
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
|
||||||
return ERR_ISCONN;
|
return ERR_ISCONN;
|
||||||
}
|
}
|
||||||
err_t err;
|
|
||||||
ip_addr_t ip_addr;
|
|
||||||
u16_t port = 1883;
|
|
||||||
|
|
||||||
if(ipaddr_aton(host, &ip_addr) == 0) {
|
if(ipaddr_aton(host, &ip_addr) == 0) {
|
||||||
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Illegal hostname: %s\n", host));
|
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Illegal hostname: %s\n", host));
|
||||||
@ -1212,10 +1218,6 @@ mqtt_client_connect(mqtt_client_t *client, const char *host, mqtt_connection_cb_
|
|||||||
mqtt_init_requests(client->req_list);
|
mqtt_init_requests(client->req_list);
|
||||||
|
|
||||||
/* Build connect message */
|
/* Build connect message */
|
||||||
/* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
|
|
||||||
u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
|
|
||||||
u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
|
|
||||||
|
|
||||||
if(client_info->will_topic != NULL && client_info->will_msg != NULL) {
|
if(client_info->will_topic != NULL && client_info->will_msg != NULL) {
|
||||||
flags |= MQTT_CONNECT_FLAG_WILL;
|
flags |= MQTT_CONNECT_FLAG_WILL;
|
||||||
flags |= (client_info->will_qos & 3) << 3;
|
flags |= (client_info->will_qos & 3) << 3;
|
||||||
@ -1284,7 +1286,6 @@ tcp_fail:
|
|||||||
tcp_abort(client->conn);
|
tcp_abort(client->conn);
|
||||||
client->conn = NULL;
|
client->conn = NULL;
|
||||||
return err;
|
return err;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1300,7 +1301,7 @@ mqtt_disconnect(mqtt_client_t *client)
|
|||||||
if(client->conn_state != TCP_DISCONNECTED) {
|
if(client->conn_state != TCP_DISCONNECTED) {
|
||||||
/* Set conn_state before calling mqtt_close to prevent callback from being called */
|
/* Set conn_state before calling mqtt_close to prevent callback from being called */
|
||||||
client->conn_state = TCP_DISCONNECTED;
|
client->conn_state = TCP_DISCONNECTED;
|
||||||
mqtt_close(client, 0);
|
mqtt_close(client, (mqtt_connection_status_t)0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,10 @@
|
|||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/** @ingroup mqtt
|
||||||
|
* @{
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Output ring-buffer size, must be able to fit largest outgoing publish message topic+payloads
|
* Output ring-buffer size, must be able to fit largest outgoing publish message topic+payloads
|
||||||
*/
|
*/
|
||||||
@ -173,39 +177,42 @@ struct mqtt_request_t
|
|||||||
u16_t timeout_diff;
|
u16_t timeout_diff;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** Ring buffer */
|
||||||
|
struct mqtt_ringbuf_t {
|
||||||
|
u16_t put;
|
||||||
|
u16_t get;
|
||||||
|
u8_t buf[MQTT_OUTPUT_RINGBUF_SIZE];
|
||||||
|
};
|
||||||
|
|
||||||
/** MQTT client */
|
/** MQTT client */
|
||||||
struct mqtt_client_t
|
struct mqtt_client_t
|
||||||
{
|
{
|
||||||
/** Timers and timeouts */
|
/** Timers and timeouts */
|
||||||
u16_t cyclic_tick;
|
u16_t cyclic_tick;
|
||||||
u16_t keep_alive;
|
u16_t keep_alive;
|
||||||
u16_t server_watchdog;
|
u16_t server_watchdog;
|
||||||
/** Packet identifier generator*/
|
/** Packet identifier generator*/
|
||||||
u16_t pkt_id_seq;
|
u16_t pkt_id_seq;
|
||||||
/** Packet identifier of pending incoming publish */
|
/** Packet identifier of pending incoming publish */
|
||||||
u16_t inpub_pkt_id;
|
u16_t inpub_pkt_id;
|
||||||
/** Connection state */
|
/** Connection state */
|
||||||
u8_t conn_state;
|
u8_t conn_state;
|
||||||
struct tcp_pcb *conn;
|
struct tcp_pcb *conn;
|
||||||
/** Connection callback */
|
/** Connection callback */
|
||||||
void *connect_arg;
|
void *connect_arg;
|
||||||
mqtt_connection_cb_t connect_cb;
|
mqtt_connection_cb_t connect_cb;
|
||||||
/** Pending requests to server */
|
/** Pending requests to server */
|
||||||
struct mqtt_request_t *pend_req_queue;
|
struct mqtt_request_t *pend_req_queue;
|
||||||
struct mqtt_request_t req_list[MQTT_REQ_MAX_IN_FLIGHT];
|
struct mqtt_request_t req_list[MQTT_REQ_MAX_IN_FLIGHT];
|
||||||
void *inpub_arg;
|
void *inpub_arg;
|
||||||
/** Incoming data callback */
|
/** Incoming data callback */
|
||||||
mqtt_incoming_data_cb_t data_cb;
|
mqtt_incoming_data_cb_t data_cb;
|
||||||
mqtt_incoming_publish_cb_t pub_cb;
|
mqtt_incoming_publish_cb_t pub_cb;
|
||||||
/** Input */
|
/** Input */
|
||||||
u32_t msg_idx;
|
u32_t msg_idx;
|
||||||
u8_t rx_buffer[MQTT_VAR_HEADER_BUFFER_LEN];
|
u8_t rx_buffer[MQTT_VAR_HEADER_BUFFER_LEN];
|
||||||
/** Output ring-buffer */
|
/** Output ring-buffer */
|
||||||
struct mqtt_ringbuf_t {
|
struct mqtt_ringbuf_t output;
|
||||||
u16_t put;
|
|
||||||
u16_t get;
|
|
||||||
u8_t buf[MQTT_OUTPUT_RINGBUF_SIZE];
|
|
||||||
} output;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -239,6 +246,10 @@ err_t mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_re
|
|||||||
err_t mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
|
err_t mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
|
||||||
mqtt_request_cb_t cb, void *arg);
|
mqtt_request_cb_t cb, void *arg);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @}
|
||||||
|
*/
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
x
Reference in New Issue
Block a user