mirror of
https://github.com/lwip-tcpip/lwip.git
synced 2024-09-17 20:24:24 +00:00
mqtt: convert to altcp API
This commit is contained in:
parent
1dfe916808
commit
26a6e034fc
@ -53,7 +53,8 @@
|
||||
#include "lwip/mem.h"
|
||||
#include "lwip/err.h"
|
||||
#include "lwip/pbuf.h"
|
||||
#include "lwip/tcp.h"
|
||||
#include "lwip/altcp.h"
|
||||
#include "lwip/altcp_tcp.h"
|
||||
#include <string.h>
|
||||
|
||||
#if LWIP_TCP && LWIP_CALLBACK_API
|
||||
@ -202,12 +203,12 @@ msg_generate_packet_id(mqtt_client_t *client)
|
||||
* @param tpcb TCP connection handle
|
||||
*/
|
||||
static void
|
||||
mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
|
||||
mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
|
||||
{
|
||||
err_t err;
|
||||
u8_t wrap = 0;
|
||||
u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
|
||||
u16_t send_len = tcp_sndbuf(tpcb);
|
||||
u16_t send_len = altcp_sndbuf(tpcb);
|
||||
LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
|
||||
|
||||
if (send_len == 0 || ringbuf_lin_len == 0) {
|
||||
@ -223,18 +224,18 @@ mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
|
||||
/* Wrap around if more data in ring buffer after linear portion */
|
||||
wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
|
||||
}
|
||||
err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
|
||||
err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
|
||||
if ((err == ERR_OK) && wrap) {
|
||||
mqtt_ringbuf_advance_get_idx(rb, send_len);
|
||||
/* Use the lesser one of ring buffer linear length and TCP send buffer size */
|
||||
send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
|
||||
err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
|
||||
send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
|
||||
err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
|
||||
}
|
||||
|
||||
if (err == ERR_OK) {
|
||||
mqtt_ringbuf_advance_get_idx(rb, send_len);
|
||||
/* Flush */
|
||||
tcp_output(tpcb);
|
||||
altcp_output(tpcb);
|
||||
} else {
|
||||
LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
|
||||
}
|
||||
@ -510,12 +511,12 @@ mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
|
||||
/* Bring down TCP connection if not already done */
|
||||
if (client->conn != NULL) {
|
||||
err_t res;
|
||||
tcp_recv(client->conn, NULL);
|
||||
tcp_err(client->conn, NULL);
|
||||
tcp_sent(client->conn, NULL);
|
||||
res = tcp_close(client->conn);
|
||||
altcp_recv(client->conn, NULL);
|
||||
altcp_err(client->conn, NULL);
|
||||
altcp_sent(client->conn, NULL);
|
||||
res = altcp_close(client->conn);
|
||||
if (res != ERR_OK) {
|
||||
tcp_abort(client->conn);
|
||||
altcp_abort(client->conn);
|
||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
|
||||
}
|
||||
client->conn = NULL;
|
||||
@ -857,7 +858,7 @@ mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
|
||||
* @return ERR_OK or err passed into callback
|
||||
*/
|
||||
static err_t
|
||||
mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
|
||||
mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
|
||||
{
|
||||
mqtt_client_t *client = (mqtt_client_t *)arg;
|
||||
LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
|
||||
@ -875,7 +876,7 @@ mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
|
||||
}
|
||||
|
||||
/* Tell remote that data has been received */
|
||||
tcp_recved(pcb, p->tot_len);
|
||||
altcp_recved(pcb, p->tot_len);
|
||||
res = mqtt_parse_incoming(client, p);
|
||||
pbuf_free(p);
|
||||
|
||||
@ -901,7 +902,7 @@ mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
|
||||
* @return ERR_OK
|
||||
*/
|
||||
static err_t
|
||||
mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
|
||||
mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
|
||||
{
|
||||
mqtt_client_t *client = (mqtt_client_t *)arg;
|
||||
|
||||
@ -952,7 +953,7 @@ mqtt_tcp_err_cb(void *arg, err_t err)
|
||||
* @return err ERR_OK
|
||||
*/
|
||||
static err_t
|
||||
mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
|
||||
mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
|
||||
{
|
||||
mqtt_client_t *client = (mqtt_client_t *)arg;
|
||||
if (client->conn_state == MQTT_CONNECTED) {
|
||||
@ -969,7 +970,7 @@ mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
|
||||
* @return ERR_OK
|
||||
*/
|
||||
static err_t
|
||||
mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
|
||||
mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
|
||||
{
|
||||
mqtt_client_t* client = (mqtt_client_t *)arg;
|
||||
|
||||
@ -982,9 +983,9 @@ mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
|
||||
client->msg_idx = 0;
|
||||
|
||||
/* Setup TCP callbacks */
|
||||
tcp_recv(tpcb, mqtt_tcp_recv_cb);
|
||||
tcp_sent(tpcb, mqtt_tcp_sent_cb);
|
||||
tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
|
||||
altcp_recv(tpcb, mqtt_tcp_recv_cb);
|
||||
altcp_sent(tpcb, mqtt_tcp_sent_cb);
|
||||
altcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
|
||||
|
||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
|
||||
/* Enter MQTT connect state */
|
||||
@ -1279,15 +1280,15 @@ mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port,
|
||||
return ERR_MEM;
|
||||
}
|
||||
|
||||
client->conn = tcp_new();
|
||||
client->conn = altcp_tcp_new();
|
||||
if (client->conn == NULL) {
|
||||
return ERR_MEM;
|
||||
}
|
||||
|
||||
/* Set arg pointer for callbacks */
|
||||
tcp_arg(client->conn, client);
|
||||
altcp_arg(client->conn, client);
|
||||
/* Any local address, pick random local port number */
|
||||
err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
|
||||
err = altcp_bind(client->conn, IP_ADDR_ANY, 0);
|
||||
if (err != ERR_OK) {
|
||||
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
|
||||
goto tcp_fail;
|
||||
@ -1295,13 +1296,13 @@ mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port,
|
||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
|
||||
|
||||
/* Connect to server */
|
||||
err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
|
||||
err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
|
||||
if (err != ERR_OK) {
|
||||
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
|
||||
goto tcp_fail;
|
||||
}
|
||||
/* Set error callback */
|
||||
tcp_err(client->conn, mqtt_tcp_err_cb);
|
||||
altcp_err(client->conn, mqtt_tcp_err_cb);
|
||||
client->conn_state = TCP_CONNECTING;
|
||||
|
||||
/* Append fixed header */
|
||||
@ -1332,7 +1333,7 @@ mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port,
|
||||
return ERR_OK;
|
||||
|
||||
tcp_fail:
|
||||
tcp_abort(client->conn);
|
||||
altcp_abort(client->conn);
|
||||
client->conn = NULL;
|
||||
return err;
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ struct mqtt_client_t
|
||||
u16_t inpub_pkt_id;
|
||||
/** Connection state */
|
||||
u8_t conn_state;
|
||||
struct tcp_pcb *conn;
|
||||
struct altcp_pcb *conn;
|
||||
/** Connection callback */
|
||||
void *connect_arg;
|
||||
mqtt_connection_cb_t connect_cb;
|
||||
|
Loading…
Reference in New Issue
Block a user