Commit c2d83a9d authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by Cesanta Bot

Don't touch send_mbuf when sending MQTT messages

h/t @M4GNV5

Fixes https://github.com/cesanta/mongoose/issues/943
Closes https://github.com/cesanta/mongoose/issues/944

CL: mg: Don't touch send_mbuf when sending MQTT messages

PUBLISHED_FROM=da7b4f8acec2e403caa4addd5739d036a6a74c76
parent f33d3a4e
...@@ -10741,18 +10741,13 @@ void mg_set_protocol_mqtt(struct mg_connection *nc) { ...@@ -10741,18 +10741,13 @@ void mg_set_protocol_mqtt(struct mg_connection *nc) {
nc->proto_data_destructor = mg_mqtt_proto_data_destructor; nc->proto_data_destructor = mg_mqtt_proto_data_destructor;
} }
static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd, static void mg_send_mqtt_header(struct mg_connection *nc, uint8_t cmd,
uint8_t flags, size_t len) { uint8_t flags, size_t len) {
struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data; struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data;
size_t off = nc->send_mbuf.len - len;
uint8_t header = cmd << 4 | (uint8_t) flags;
uint8_t buf[1 + sizeof(size_t)]; uint8_t buf[1 + sizeof(size_t)];
uint8_t *vlen = &buf[1]; uint8_t *vlen = &buf[1];
assert(nc->send_mbuf.len >= len); buf[0] = (cmd << 4) | flags;
buf[0] = header;
/* mqtt variable length encoding */ /* mqtt variable length encoding */
do { do {
...@@ -10762,7 +10757,7 @@ static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd, ...@@ -10762,7 +10757,7 @@ static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd,
vlen++; vlen++;
} while (len > 0); } while (len > 0);
mbuf_insert(&nc->send_mbuf, off, buf, vlen - buf); mg_send(nc, buf, vlen - buf);
pd->last_control_time = mg_time(); pd->last_control_time = mg_time();
} }
...@@ -10773,11 +10768,16 @@ void mg_send_mqtt_handshake(struct mg_connection *nc, const char *client_id) { ...@@ -10773,11 +10768,16 @@ void mg_send_mqtt_handshake(struct mg_connection *nc, const char *client_id) {
void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
struct mg_send_mqtt_handshake_opts opts) { struct mg_send_mqtt_handshake_opts opts) {
uint16_t hlen, nlen, rem_len = 0;
struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data; struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data;
uint16_t id_len = 0, wt_len = 0, wm_len = 0, user_len = 0, pw_len = 0;
uint16_t netbytes;
size_t total_len;
mg_send(nc, "\00\04MQTT\04", 7); if (client_id != NULL) {
rem_len += 7; id_len = strlen(client_id);
}
total_len = 7 + 1 + 2 + 2 + id_len;
if (opts.user_name != NULL) { if (opts.user_name != NULL) {
opts.flags |= MG_MQTT_HAS_USER_NAME; opts.flags |= MG_MQTT_HAS_USER_NAME;
...@@ -10786,56 +10786,58 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, ...@@ -10786,56 +10786,58 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
opts.flags |= MG_MQTT_HAS_PASSWORD; opts.flags |= MG_MQTT_HAS_PASSWORD;
} }
if (opts.will_topic != NULL && opts.will_message != NULL) { if (opts.will_topic != NULL && opts.will_message != NULL) {
wt_len = strlen(opts.will_topic);
wm_len = strlen(opts.will_message);
opts.flags |= MG_MQTT_HAS_WILL; opts.flags |= MG_MQTT_HAS_WILL;
} }
if (opts.keep_alive == 0) { if (opts.keep_alive == 0) {
opts.keep_alive = 60; opts.keep_alive = 60;
} }
if (opts.flags & MG_MQTT_HAS_WILL) {
total_len += 2 + wt_len + 2 + wm_len;
}
if (opts.flags & MG_MQTT_HAS_USER_NAME) {
user_len = strlen(opts.user_name);
total_len += 2 + user_len;
}
if (opts.flags & MG_MQTT_HAS_PASSWORD) {
pw_len = strlen(opts.password);
total_len += 2 + pw_len;
}
mg_send_mqtt_header(nc, MG_MQTT_CMD_CONNECT, 0, total_len);
mg_send(nc, "\00\04MQTT\04", 7);
mg_send(nc, &opts.flags, 1); mg_send(nc, &opts.flags, 1);
rem_len += 1;
nlen = htons(opts.keep_alive); netbytes = htons(opts.keep_alive);
mg_send(nc, &nlen, 2); mg_send(nc, &netbytes, 2);
rem_len += 2;
hlen = strlen(client_id); netbytes = htons(id_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, client_id, id_len);
mg_send(nc, client_id, hlen);
rem_len += 2 + hlen;
if (opts.flags & MG_MQTT_HAS_WILL) { if (opts.flags & MG_MQTT_HAS_WILL) {
hlen = strlen(opts.will_topic); netbytes = htons(wt_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.will_topic, wt_len);
mg_send(nc, opts.will_topic, hlen);
rem_len += 2 + hlen;
hlen = strlen(opts.will_message); netbytes = htons(wm_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.will_message, wm_len);
mg_send(nc, opts.will_message, hlen);
rem_len += 2 + hlen;
} }
if (opts.flags & MG_MQTT_HAS_USER_NAME) { if (opts.flags & MG_MQTT_HAS_USER_NAME) {
hlen = strlen(opts.user_name); netbytes = htons(user_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.user_name, user_len);
mg_send(nc, opts.user_name, hlen);
rem_len += 2 + hlen;
} }
if (opts.flags & MG_MQTT_HAS_PASSWORD) { if (opts.flags & MG_MQTT_HAS_PASSWORD) {
hlen = strlen(opts.password); netbytes = htons(pw_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.password, pw_len);
mg_send(nc, opts.password, hlen);
rem_len += 2 + hlen;
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_CONNECT, 0, rem_len);
if (pd != NULL) { if (pd != NULL) {
pd->keep_alive = opts.keep_alive; pd->keep_alive = opts.keep_alive;
} }
...@@ -10844,40 +10846,52 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, ...@@ -10844,40 +10846,52 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
void mg_mqtt_publish(struct mg_connection *nc, const char *topic, void mg_mqtt_publish(struct mg_connection *nc, const char *topic,
uint16_t message_id, int flags, const void *data, uint16_t message_id, int flags, const void *data,
size_t len) { size_t len) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t topic_len = strlen(topic);
uint16_t topic_len = htons((uint16_t) strlen(topic)); size_t total_len = 2 + topic_len + len;
uint16_t message_id_net = htons(message_id); if (MG_MQTT_GET_QOS(flags) > 0) {
total_len += 2;
}
mg_send_mqtt_header(nc, MG_MQTT_CMD_PUBLISH, flags, total_len);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topic, topic_len);
mg_send(nc, &topic_len, 2);
mg_send(nc, topic, strlen(topic));
if (MG_MQTT_GET_QOS(flags) > 0) { if (MG_MQTT_GET_QOS(flags) > 0) {
mg_send(nc, &message_id_net, 2); netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
} }
mg_send(nc, data, len);
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PUBLISH, flags, mg_send(nc, data, len);
nc->send_mbuf.len - old_len);
} }
void mg_mqtt_subscribe(struct mg_connection *nc, void mg_mqtt_subscribe(struct mg_connection *nc,
const struct mg_mqtt_topic_expression *topics, const struct mg_mqtt_topic_expression *topics,
size_t topics_len, uint16_t message_id) { size_t topics_len, uint16_t message_id) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t message_id_n = htons(message_id);
size_t i; size_t i;
uint16_t topic_len;
size_t total_len = 2;
mg_send(nc, (char *) &message_id_n, 2);
for (i = 0; i < topics_len; i++) { for (i = 0; i < topics_len; i++) {
uint16_t topic_len_n = htons((uint16_t) strlen(topics[i].topic)); total_len += 2 + strlen(topics[i].topic) + 1;
mg_send(nc, &topic_len_n, 2);
mg_send(nc, topics[i].topic, strlen(topics[i].topic));
mg_send(nc, &topics[i].qos, 1);
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_SUBSCRIBE, MG_MQTT_QOS(1), mg_send_mqtt_header(nc, MG_MQTT_CMD_SUBSCRIBE, MG_MQTT_QOS(1), total_len);
nc->send_mbuf.len - old_len);
netbytes = htons(message_id);
mg_send(nc, (char *) &netbytes, 2);
for (i = 0; i < topics_len; i++) {
topic_len = strlen(topics[i].topic);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topics[i].topic, topic_len);
mg_send(nc, &topics[i].qos, 1);
}
} }
int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg, int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
...@@ -10897,27 +10911,33 @@ int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg, ...@@ -10897,27 +10911,33 @@ int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
void mg_mqtt_unsubscribe(struct mg_connection *nc, char **topics, void mg_mqtt_unsubscribe(struct mg_connection *nc, char **topics,
size_t topics_len, uint16_t message_id) { size_t topics_len, uint16_t message_id) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t message_id_n = htons(message_id);
size_t i; size_t i;
uint16_t topic_len;
size_t total_len = 2;
mg_send(nc, (char *) &message_id_n, 2);
for (i = 0; i < topics_len; i++) { for (i = 0; i < topics_len; i++) {
uint16_t topic_len_n = htons((uint16_t) strlen(topics[i])); total_len += 2 + strlen(topics[i]);
mg_send(nc, &topic_len_n, 2);
mg_send(nc, topics[i], strlen(topics[i]));
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_UNSUBSCRIBE, MG_MQTT_QOS(1), mg_send_mqtt_header(nc, MG_MQTT_CMD_UNSUBSCRIBE, MG_MQTT_QOS(1), total_len);
nc->send_mbuf.len - old_len);
netbytes = htons(message_id);
mg_send(nc, (char *) &netbytes, 2);
for (i = 0; i < topics_len; i++) {
topic_len = strlen(topics[i]);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topics[i], topic_len);
}
} }
void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) { void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) {
uint8_t unused = 0; uint8_t unused = 0;
mg_send_mqtt_header(nc, MG_MQTT_CMD_CONNACK, 0, 2);
mg_send(nc, &unused, 1); mg_send(nc, &unused, 1);
mg_send(nc, &return_code, 1); mg_send(nc, &return_code, 1);
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_CONNACK, 0, 2);
} }
/* /*
...@@ -10927,10 +10947,13 @@ void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) { ...@@ -10927,10 +10947,13 @@ void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) {
*/ */
static void mg_send_mqtt_short_command(struct mg_connection *nc, uint8_t cmd, static void mg_send_mqtt_short_command(struct mg_connection *nc, uint8_t cmd,
uint16_t message_id) { uint16_t message_id) {
uint16_t message_id_net = htons(message_id); uint16_t netbytes;
uint8_t flags = (cmd == MG_MQTT_CMD_PUBREL ? 2 : 0); uint8_t flags = (cmd == MG_MQTT_CMD_PUBREL ? 2 : 0);
mg_send(nc, &message_id_net, 2);
mg_mqtt_prepend_header(nc, cmd, flags, 2 /* len */); mg_send_mqtt_header(nc, cmd, flags, 2 /* len */);
netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
} }
void mg_mqtt_puback(struct mg_connection *nc, uint16_t message_id) { void mg_mqtt_puback(struct mg_connection *nc, uint16_t message_id) {
...@@ -10952,12 +10975,16 @@ void mg_mqtt_pubcomp(struct mg_connection *nc, uint16_t message_id) { ...@@ -10952,12 +10975,16 @@ void mg_mqtt_pubcomp(struct mg_connection *nc, uint16_t message_id) {
void mg_mqtt_suback(struct mg_connection *nc, uint8_t *qoss, size_t qoss_len, void mg_mqtt_suback(struct mg_connection *nc, uint8_t *qoss, size_t qoss_len,
uint16_t message_id) { uint16_t message_id) {
size_t i; size_t i;
uint16_t message_id_net = htons(message_id); uint16_t netbytes;
mg_send(nc, &message_id_net, 2);
mg_send_mqtt_header(nc, MG_MQTT_CMD_SUBACK, MG_MQTT_QOS(1), 2 + qoss_len);
netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
for (i = 0; i < qoss_len; i++) { for (i = 0; i < qoss_len; i++) {
mg_send(nc, &qoss[i], 1); mg_send(nc, &qoss[i], 1);
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_SUBACK, MG_MQTT_QOS(1), 2 + qoss_len);
} }
void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) { void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) {
...@@ -10965,15 +10992,15 @@ void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) { ...@@ -10965,15 +10992,15 @@ void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) {
} }
void mg_mqtt_ping(struct mg_connection *nc) { void mg_mqtt_ping(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PINGREQ, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_PINGREQ, 0, 0);
} }
void mg_mqtt_pong(struct mg_connection *nc) { void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PINGRESP, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_PINGRESP, 0, 0);
} }
void mg_mqtt_disconnect(struct mg_connection *nc) { void mg_mqtt_disconnect(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_DISCONNECT, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_DISCONNECT, 0, 0);
} }
#endif /* MG_ENABLE_MQTT */ #endif /* MG_ENABLE_MQTT */
......
...@@ -228,18 +228,13 @@ void mg_set_protocol_mqtt(struct mg_connection *nc) { ...@@ -228,18 +228,13 @@ void mg_set_protocol_mqtt(struct mg_connection *nc) {
nc->proto_data_destructor = mg_mqtt_proto_data_destructor; nc->proto_data_destructor = mg_mqtt_proto_data_destructor;
} }
static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd, static void mg_send_mqtt_header(struct mg_connection *nc, uint8_t cmd,
uint8_t flags, size_t len) { uint8_t flags, size_t len) {
struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data; struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data;
size_t off = nc->send_mbuf.len - len;
uint8_t header = cmd << 4 | (uint8_t) flags;
uint8_t buf[1 + sizeof(size_t)]; uint8_t buf[1 + sizeof(size_t)];
uint8_t *vlen = &buf[1]; uint8_t *vlen = &buf[1];
assert(nc->send_mbuf.len >= len); buf[0] = (cmd << 4) | flags;
buf[0] = header;
/* mqtt variable length encoding */ /* mqtt variable length encoding */
do { do {
...@@ -249,7 +244,7 @@ static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd, ...@@ -249,7 +244,7 @@ static void mg_mqtt_prepend_header(struct mg_connection *nc, uint8_t cmd,
vlen++; vlen++;
} while (len > 0); } while (len > 0);
mbuf_insert(&nc->send_mbuf, off, buf, vlen - buf); mg_send(nc, buf, vlen - buf);
pd->last_control_time = mg_time(); pd->last_control_time = mg_time();
} }
...@@ -260,11 +255,16 @@ void mg_send_mqtt_handshake(struct mg_connection *nc, const char *client_id) { ...@@ -260,11 +255,16 @@ void mg_send_mqtt_handshake(struct mg_connection *nc, const char *client_id) {
void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
struct mg_send_mqtt_handshake_opts opts) { struct mg_send_mqtt_handshake_opts opts) {
uint16_t hlen, nlen, rem_len = 0;
struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data; struct mg_mqtt_proto_data *pd = (struct mg_mqtt_proto_data *) nc->proto_data;
uint16_t id_len = 0, wt_len = 0, wm_len = 0, user_len = 0, pw_len = 0;
uint16_t netbytes;
size_t total_len;
mg_send(nc, "\00\04MQTT\04", 7); if (client_id != NULL) {
rem_len += 7; id_len = strlen(client_id);
}
total_len = 7 + 1 + 2 + 2 + id_len;
if (opts.user_name != NULL) { if (opts.user_name != NULL) {
opts.flags |= MG_MQTT_HAS_USER_NAME; opts.flags |= MG_MQTT_HAS_USER_NAME;
...@@ -273,56 +273,58 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, ...@@ -273,56 +273,58 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
opts.flags |= MG_MQTT_HAS_PASSWORD; opts.flags |= MG_MQTT_HAS_PASSWORD;
} }
if (opts.will_topic != NULL && opts.will_message != NULL) { if (opts.will_topic != NULL && opts.will_message != NULL) {
wt_len = strlen(opts.will_topic);
wm_len = strlen(opts.will_message);
opts.flags |= MG_MQTT_HAS_WILL; opts.flags |= MG_MQTT_HAS_WILL;
} }
if (opts.keep_alive == 0) { if (opts.keep_alive == 0) {
opts.keep_alive = 60; opts.keep_alive = 60;
} }
if (opts.flags & MG_MQTT_HAS_WILL) {
total_len += 2 + wt_len + 2 + wm_len;
}
if (opts.flags & MG_MQTT_HAS_USER_NAME) {
user_len = strlen(opts.user_name);
total_len += 2 + user_len;
}
if (opts.flags & MG_MQTT_HAS_PASSWORD) {
pw_len = strlen(opts.password);
total_len += 2 + pw_len;
}
mg_send_mqtt_header(nc, MG_MQTT_CMD_CONNECT, 0, total_len);
mg_send(nc, "\00\04MQTT\04", 7);
mg_send(nc, &opts.flags, 1); mg_send(nc, &opts.flags, 1);
rem_len += 1;
nlen = htons(opts.keep_alive); netbytes = htons(opts.keep_alive);
mg_send(nc, &nlen, 2); mg_send(nc, &netbytes, 2);
rem_len += 2;
hlen = strlen(client_id); netbytes = htons(id_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, client_id, id_len);
mg_send(nc, client_id, hlen);
rem_len += 2 + hlen;
if (opts.flags & MG_MQTT_HAS_WILL) { if (opts.flags & MG_MQTT_HAS_WILL) {
hlen = strlen(opts.will_topic); netbytes = htons(wt_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.will_topic, wt_len);
mg_send(nc, opts.will_topic, hlen);
rem_len += 2 + hlen;
hlen = strlen(opts.will_message); netbytes = htons(wm_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.will_message, wm_len);
mg_send(nc, opts.will_message, hlen);
rem_len += 2 + hlen;
} }
if (opts.flags & MG_MQTT_HAS_USER_NAME) { if (opts.flags & MG_MQTT_HAS_USER_NAME) {
hlen = strlen(opts.user_name); netbytes = htons(user_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.user_name, user_len);
mg_send(nc, opts.user_name, hlen);
rem_len += 2 + hlen;
} }
if (opts.flags & MG_MQTT_HAS_PASSWORD) { if (opts.flags & MG_MQTT_HAS_PASSWORD) {
hlen = strlen(opts.password); netbytes = htons(pw_len);
nlen = htons((uint16_t) hlen); mg_send(nc, &netbytes, 2);
mg_send(nc, &nlen, 2); mg_send(nc, opts.password, pw_len);
mg_send(nc, opts.password, hlen);
rem_len += 2 + hlen;
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_CONNECT, 0, rem_len);
if (pd != NULL) { if (pd != NULL) {
pd->keep_alive = opts.keep_alive; pd->keep_alive = opts.keep_alive;
} }
...@@ -331,40 +333,52 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id, ...@@ -331,40 +333,52 @@ void mg_send_mqtt_handshake_opt(struct mg_connection *nc, const char *client_id,
void mg_mqtt_publish(struct mg_connection *nc, const char *topic, void mg_mqtt_publish(struct mg_connection *nc, const char *topic,
uint16_t message_id, int flags, const void *data, uint16_t message_id, int flags, const void *data,
size_t len) { size_t len) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t topic_len = strlen(topic);
uint16_t topic_len = htons((uint16_t) strlen(topic)); size_t total_len = 2 + topic_len + len;
uint16_t message_id_net = htons(message_id); if (MG_MQTT_GET_QOS(flags) > 0) {
total_len += 2;
}
mg_send_mqtt_header(nc, MG_MQTT_CMD_PUBLISH, flags, total_len);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topic, topic_len);
mg_send(nc, &topic_len, 2);
mg_send(nc, topic, strlen(topic));
if (MG_MQTT_GET_QOS(flags) > 0) { if (MG_MQTT_GET_QOS(flags) > 0) {
mg_send(nc, &message_id_net, 2); netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
} }
mg_send(nc, data, len);
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PUBLISH, flags, mg_send(nc, data, len);
nc->send_mbuf.len - old_len);
} }
void mg_mqtt_subscribe(struct mg_connection *nc, void mg_mqtt_subscribe(struct mg_connection *nc,
const struct mg_mqtt_topic_expression *topics, const struct mg_mqtt_topic_expression *topics,
size_t topics_len, uint16_t message_id) { size_t topics_len, uint16_t message_id) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t message_id_n = htons(message_id);
size_t i; size_t i;
uint16_t topic_len;
size_t total_len = 2;
mg_send(nc, (char *) &message_id_n, 2);
for (i = 0; i < topics_len; i++) { for (i = 0; i < topics_len; i++) {
uint16_t topic_len_n = htons((uint16_t) strlen(topics[i].topic)); total_len += 2 + strlen(topics[i].topic) + 1;
mg_send(nc, &topic_len_n, 2);
mg_send(nc, topics[i].topic, strlen(topics[i].topic));
mg_send(nc, &topics[i].qos, 1);
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_SUBSCRIBE, MG_MQTT_QOS(1), mg_send_mqtt_header(nc, MG_MQTT_CMD_SUBSCRIBE, MG_MQTT_QOS(1), total_len);
nc->send_mbuf.len - old_len);
netbytes = htons(message_id);
mg_send(nc, (char *) &netbytes, 2);
for (i = 0; i < topics_len; i++) {
topic_len = strlen(topics[i].topic);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topics[i].topic, topic_len);
mg_send(nc, &topics[i].qos, 1);
}
} }
int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg, int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
...@@ -384,27 +398,33 @@ int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg, ...@@ -384,27 +398,33 @@ int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
void mg_mqtt_unsubscribe(struct mg_connection *nc, char **topics, void mg_mqtt_unsubscribe(struct mg_connection *nc, char **topics,
size_t topics_len, uint16_t message_id) { size_t topics_len, uint16_t message_id) {
size_t old_len = nc->send_mbuf.len; uint16_t netbytes;
uint16_t message_id_n = htons(message_id);
size_t i; size_t i;
uint16_t topic_len;
size_t total_len = 2;
mg_send(nc, (char *) &message_id_n, 2);
for (i = 0; i < topics_len; i++) { for (i = 0; i < topics_len; i++) {
uint16_t topic_len_n = htons((uint16_t) strlen(topics[i])); total_len += 2 + strlen(topics[i]);
mg_send(nc, &topic_len_n, 2);
mg_send(nc, topics[i], strlen(topics[i]));
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_UNSUBSCRIBE, MG_MQTT_QOS(1), mg_send_mqtt_header(nc, MG_MQTT_CMD_UNSUBSCRIBE, MG_MQTT_QOS(1), total_len);
nc->send_mbuf.len - old_len);
netbytes = htons(message_id);
mg_send(nc, (char *) &netbytes, 2);
for (i = 0; i < topics_len; i++) {
topic_len = strlen(topics[i]);
netbytes = htons(topic_len);
mg_send(nc, &netbytes, 2);
mg_send(nc, topics[i], topic_len);
}
} }
void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) { void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) {
uint8_t unused = 0; uint8_t unused = 0;
mg_send_mqtt_header(nc, MG_MQTT_CMD_CONNACK, 0, 2);
mg_send(nc, &unused, 1); mg_send(nc, &unused, 1);
mg_send(nc, &return_code, 1); mg_send(nc, &return_code, 1);
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_CONNACK, 0, 2);
} }
/* /*
...@@ -414,10 +434,13 @@ void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) { ...@@ -414,10 +434,13 @@ void mg_mqtt_connack(struct mg_connection *nc, uint8_t return_code) {
*/ */
static void mg_send_mqtt_short_command(struct mg_connection *nc, uint8_t cmd, static void mg_send_mqtt_short_command(struct mg_connection *nc, uint8_t cmd,
uint16_t message_id) { uint16_t message_id) {
uint16_t message_id_net = htons(message_id); uint16_t netbytes;
uint8_t flags = (cmd == MG_MQTT_CMD_PUBREL ? 2 : 0); uint8_t flags = (cmd == MG_MQTT_CMD_PUBREL ? 2 : 0);
mg_send(nc, &message_id_net, 2);
mg_mqtt_prepend_header(nc, cmd, flags, 2 /* len */); mg_send_mqtt_header(nc, cmd, flags, 2 /* len */);
netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
} }
void mg_mqtt_puback(struct mg_connection *nc, uint16_t message_id) { void mg_mqtt_puback(struct mg_connection *nc, uint16_t message_id) {
...@@ -439,12 +462,16 @@ void mg_mqtt_pubcomp(struct mg_connection *nc, uint16_t message_id) { ...@@ -439,12 +462,16 @@ void mg_mqtt_pubcomp(struct mg_connection *nc, uint16_t message_id) {
void mg_mqtt_suback(struct mg_connection *nc, uint8_t *qoss, size_t qoss_len, void mg_mqtt_suback(struct mg_connection *nc, uint8_t *qoss, size_t qoss_len,
uint16_t message_id) { uint16_t message_id) {
size_t i; size_t i;
uint16_t message_id_net = htons(message_id); uint16_t netbytes;
mg_send(nc, &message_id_net, 2);
mg_send_mqtt_header(nc, MG_MQTT_CMD_SUBACK, MG_MQTT_QOS(1), 2 + qoss_len);
netbytes = htons(message_id);
mg_send(nc, &netbytes, 2);
for (i = 0; i < qoss_len; i++) { for (i = 0; i < qoss_len; i++) {
mg_send(nc, &qoss[i], 1); mg_send(nc, &qoss[i], 1);
} }
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_SUBACK, MG_MQTT_QOS(1), 2 + qoss_len);
} }
void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) { void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) {
...@@ -452,15 +479,15 @@ void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) { ...@@ -452,15 +479,15 @@ void mg_mqtt_unsuback(struct mg_connection *nc, uint16_t message_id) {
} }
void mg_mqtt_ping(struct mg_connection *nc) { void mg_mqtt_ping(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PINGREQ, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_PINGREQ, 0, 0);
} }
void mg_mqtt_pong(struct mg_connection *nc) { void mg_mqtt_pong(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_PINGRESP, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_PINGRESP, 0, 0);
} }
void mg_mqtt_disconnect(struct mg_connection *nc) { void mg_mqtt_disconnect(struct mg_connection *nc) {
mg_mqtt_prepend_header(nc, MG_MQTT_CMD_DISCONNECT, 0, 0); mg_send_mqtt_header(nc, MG_MQTT_CMD_DISCONNECT, 0, 0);
} }
#endif /* MG_ENABLE_MQTT */ #endif /* MG_ENABLE_MQTT */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment