Commit f1b4f0c4 authored by Sergey Lyubka's avatar Sergey Lyubka Committed by Cesanta Bot

Fix SEGFAULT in MQTT server

PUBLISHED_FROM=6455082726781aab819a7f8645e81adaa7fc7669
parent c04e3f80
---
title: "LIST_ENTRY()"
decl_name: "LIST_ENTRY"
symbol_kind: "func"
signature: |
LIST_ENTRY(mg_mqtt_session);
---
Broker
...@@ -3,9 +3,7 @@ title: "MQTT Server API reference" ...@@ -3,9 +3,7 @@ title: "MQTT Server API reference"
symbol_kind: "intro" symbol_kind: "intro"
decl_name: "mqtt_server.h" decl_name: "mqtt_server.h"
items: items:
- { name: mg_mqtt_broker.md } - { name: LIST_ENTRY.md }
- { name: mg_mqtt_broker_init.md }
- { name: mg_mqtt_next.md }
- { name: struct_mg_mqtt_broker.md } - { name: struct_mg_mqtt_broker.md }
- { name: struct_mg_mqtt_session.md } - { name: struct_mg_mqtt_session.md }
--- ---
......
---
title: "mg_mqtt_broker()"
decl_name: "mg_mqtt_broker"
symbol_kind: "func"
signature: |
void mg_mqtt_broker(struct mg_connection *brk, int ev, void *data);
---
Processes a MQTT broker message.
The listening connection expects a pointer to an initialised
`mg_mqtt_broker` structure in the `user_data` field.
Basic usage:
```c
mg_mqtt_broker_init(&brk, NULL);
if ((nc = mg_bind(&mgr, address, mg_mqtt_broker)) == NULL) {
// fail;
}
nc->user_data = &brk;
```
New incoming connections will receive a `mg_mqtt_session` structure
in the connection `user_data`. The original `user_data` will be stored
in the `user_data` field of the session structure. This allows the user
handler to store user data before `mg_mqtt_broker` creates the session.
Since only the MG_EV_ACCEPT message is processed by the listening socket,
for most events the `user_data` will thus point to a `mg_mqtt_session`.
---
title: "mg_mqtt_broker_init()"
decl_name: "mg_mqtt_broker_init"
symbol_kind: "func"
signature: |
void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data);
---
Initialises a MQTT broker.
---
title: "mg_mqtt_next()"
decl_name: "mg_mqtt_next"
symbol_kind: "func"
signature: |
struct mg_mqtt_session *mg_mqtt_next(struct mg_mqtt_broker *brk,
struct mg_mqtt_session *s);
---
Iterates over all MQTT session connections. Example:
```c
struct mg_mqtt_session *s;
for (s = mg_mqtt_next(brk, NULL); s != NULL; s = mg_mqtt_next(brk, s)) {
// Do something
}
```
...@@ -4,7 +4,7 @@ decl_name: "struct mg_mqtt_broker" ...@@ -4,7 +4,7 @@ decl_name: "struct mg_mqtt_broker"
symbol_kind: "struct" symbol_kind: "struct"
signature: | signature: |
struct mg_mqtt_broker { struct mg_mqtt_broker {
struct mg_mqtt_session *sessions; /* Session list */ LIST_HEAD(, mg_mqtt_session) sessions; /* Session list */
void *user_data; /* User data */ void *user_data; /* User data */
}; };
--- ---
......
...@@ -5,11 +5,11 @@ symbol_kind: "struct" ...@@ -5,11 +5,11 @@ symbol_kind: "struct"
signature: | signature: |
struct mg_mqtt_session { struct mg_mqtt_session {
struct mg_mqtt_broker *brk; /* Broker */ struct mg_mqtt_broker *brk; /* Broker */
struct mg_mqtt_session *next, *prev; /* mg_mqtt_broker::sessions linkage */ LIST_ENTRY(mg_mqtt_session) link; /* mg_mqtt_broker::sessions linkage */
struct mg_connection *nc; /* Connection with the client */ struct mg_connection *nc; /* Connection with the client */
size_t num_subscriptions; /* Size of `subscriptions` array */ size_t num_subscriptions; /* Size of `subscriptions` array */
struct mg_mqtt_topic_expression *subscriptions;
void *user_data; /* User data */ void *user_data; /* User data */
struct mg_mqtt_topic_expression *subscriptions;
}; };
--- ---
......
...@@ -8595,7 +8595,7 @@ void mg_mqtt_disconnect(struct mg_connection *nc) { ...@@ -8595,7 +8595,7 @@ void mg_mqtt_disconnect(struct mg_connection *nc) {
*/ */
/* Amalgamated: #include "mongoose/src/internal.h" */ /* Amalgamated: #include "mongoose/src/internal.h" */
/* Amalgamated: #include "mongoose/src/mqtt-broker.h" */ /* Amalgamated: #include "mongoose/src/mqtt-server.h" */
#if MG_ENABLE_MQTT_BROKER #if MG_ENABLE_MQTT_BROKER
...@@ -8609,16 +8609,11 @@ static void mg_mqtt_session_init(struct mg_mqtt_broker *brk, ...@@ -8609,16 +8609,11 @@ static void mg_mqtt_session_init(struct mg_mqtt_broker *brk,
} }
static void mg_mqtt_add_session(struct mg_mqtt_session *s) { static void mg_mqtt_add_session(struct mg_mqtt_session *s) {
s->next = s->brk->sessions; LIST_INSERT_HEAD(&s->brk->sessions, s, link);
s->brk->sessions = s;
s->prev = NULL;
if (s->next != NULL) s->next->prev = s;
} }
static void mg_mqtt_remove_session(struct mg_mqtt_session *s) { static void mg_mqtt_remove_session(struct mg_mqtt_session *s) {
if (s->prev == NULL) s->brk->sessions = s->next; LIST_REMOVE(s, link);
if (s->prev) s->prev->next = s->next;
if (s->next) s->next->prev = s->prev;
} }
static void mg_mqtt_destroy_session(struct mg_mqtt_session *s) { static void mg_mqtt_destroy_session(struct mg_mqtt_session *s) {
...@@ -8636,7 +8631,7 @@ static void mg_mqtt_close_session(struct mg_mqtt_session *s) { ...@@ -8636,7 +8631,7 @@ static void mg_mqtt_close_session(struct mg_mqtt_session *s) {
} }
void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) { void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) {
brk->sessions = NULL; LIST_EMPTY(&brk->sessions);
brk->user_data = user_data; brk->user_data = user_data;
} }
...@@ -8662,7 +8657,6 @@ static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk, ...@@ -8662,7 +8657,6 @@ static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc, static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
struct mg_mqtt_message *msg) { struct mg_mqtt_message *msg) {
struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->user_data; struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->user_data;
uint8_t qoss[512]; uint8_t qoss[512];
size_t qoss_len = 0; size_t qoss_len = 0;
...@@ -8676,7 +8670,6 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc, ...@@ -8676,7 +8670,6 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
qoss[qoss_len++] = qos; qoss[qoss_len++] = qos;
} }
ss->subscriptions = (struct mg_mqtt_topic_expression *) realloc( ss->subscriptions = (struct mg_mqtt_topic_expression *) realloc(
ss->subscriptions, sizeof(*ss->subscriptions) * qoss_len); ss->subscriptions, sizeof(*ss->subscriptions) * qoss_len);
for (pos = 0; for (pos = 0;
...@@ -8721,7 +8714,8 @@ static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk, ...@@ -8721,7 +8714,8 @@ static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
if (mg_mqtt_match_topic_expression(s->subscriptions[i].topic, if (mg_mqtt_match_topic_expression(s->subscriptions[i].topic,
&msg->topic)) { &msg->topic)) {
char buf[100], *p = buf; char buf[100], *p = buf;
mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len, msg->topic.p); mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len,
msg->topic.p);
if (p == NULL) { if (p == NULL) {
return; return;
} }
...@@ -8748,6 +8742,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) { ...@@ -8748,6 +8742,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
switch (ev) { switch (ev) {
case MG_EV_ACCEPT: case MG_EV_ACCEPT:
mg_set_protocol_mqtt(nc); mg_set_protocol_mqtt(nc);
nc->user_data = NULL; // This is NOT a listening connection
break; break;
case MG_EV_MQTT_CONNECT: case MG_EV_MQTT_CONNECT:
mg_mqtt_broker_handle_connect(brk, nc); mg_mqtt_broker_handle_connect(brk, nc);
...@@ -8768,7 +8763,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) { ...@@ -8768,7 +8763,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
struct mg_mqtt_session *mg_mqtt_next(struct mg_mqtt_broker *brk, struct mg_mqtt_session *mg_mqtt_next(struct mg_mqtt_broker *brk,
struct mg_mqtt_session *s) { struct mg_mqtt_session *s) {
return s == NULL ? brk->sessions : s->next; return s == NULL ? LIST_FIRST(&brk->sessions) : LIST_NEXT(s, link);
} }
#endif /* MG_ENABLE_MQTT_BROKER */ #endif /* MG_ENABLE_MQTT_BROKER */
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment