/* * Copyright (c) 2014 Cesanta Software Limited * All rights reserved */ #include "mg_internal.h" #include "mg_mqtt_server.h" #if MG_ENABLE_MQTT_BROKER static void mg_mqtt_session_init(struct mg_mqtt_broker *brk, struct mg_mqtt_session *s, struct mg_connection *nc) { s->brk = brk; s->subscriptions = NULL; s->num_subscriptions = 0; s->nc = nc; } static void mg_mqtt_add_session(struct mg_mqtt_session *s) { LIST_INSERT_HEAD(&s->brk->sessions, s, link); } static void mg_mqtt_remove_session(struct mg_mqtt_session *s) { LIST_REMOVE(s, link); } static void mg_mqtt_destroy_session(struct mg_mqtt_session *s) { size_t i; for (i = 0; i < s->num_subscriptions; i++) { MG_FREE((void *) s->subscriptions[i].topic); } MG_FREE(s->subscriptions); MG_FREE(s); } static void mg_mqtt_close_session(struct mg_mqtt_session *s) { mg_mqtt_remove_session(s); mg_mqtt_destroy_session(s); } void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) { LIST_INIT(&brk->sessions); brk->user_data = user_data; } static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk, struct mg_connection *nc) { struct mg_mqtt_session *s = (struct mg_mqtt_session *) MG_CALLOC(1, sizeof *s); if (s == NULL) { /* LCOV_EXCL_START */ mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_SERVER_UNAVAILABLE); return; /* LCOV_EXCL_STOP */ } /* TODO(mkm): check header (magic and version) */ mg_mqtt_session_init(brk, s, nc); nc->priv_2 = s; mg_mqtt_add_session(s); mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_ACCEPTED); } static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc, struct mg_mqtt_message *msg) { struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->priv_2; uint8_t qoss[MG_MQTT_MAX_SESSION_SUBSCRIPTIONS]; size_t num_subs = 0; struct mg_str topic; uint8_t qos; int pos; struct mg_mqtt_topic_expression *te; for (pos = 0; (pos = mg_mqtt_next_subscribe_topic(msg, &topic, &qos, pos)) != -1;) { if (num_subs >= sizeof(MG_MQTT_MAX_SESSION_SUBSCRIPTIONS) || (ss->num_subscriptions + num_subs >= MG_MQTT_MAX_SESSION_SUBSCRIPTIONS)) { nc->flags |= MG_F_CLOSE_IMMEDIATELY; return; } qoss[num_subs++] = qos; } if (num_subs > 0) { te = (struct mg_mqtt_topic_expression *) MG_REALLOC( ss->subscriptions, sizeof(*ss->subscriptions) * (ss->num_subscriptions + num_subs)); if (te == NULL) { nc->flags |= MG_F_CLOSE_IMMEDIATELY; return; } ss->subscriptions = te; for (pos = 0; pos < (int) msg->payload.len && (pos = mg_mqtt_next_subscribe_topic(msg, &topic, &qos, pos)) != -1; ss->num_subscriptions++) { te = &ss->subscriptions[ss->num_subscriptions]; te->topic = (char *) MG_MALLOC(topic.len + 1); te->qos = qos; memcpy((char *) te->topic, topic.p, topic.len); ((char *) te->topic)[topic.len] = '\0'; } } if (pos == (int) msg->payload.len) { mg_mqtt_suback(nc, qoss, num_subs, msg->message_id); } else { /* We did not fully parse the payload, something must be wrong. */ nc->flags |= MG_F_CLOSE_IMMEDIATELY; } } static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk, struct mg_mqtt_message *msg) { struct mg_mqtt_session *s; size_t i; for (s = mg_mqtt_next(brk, NULL); s != NULL; s = mg_mqtt_next(brk, s)) { for (i = 0; i < s->num_subscriptions; i++) { if (mg_mqtt_vmatch_topic_expression(s->subscriptions[i].topic, msg->topic)) { char buf[100], *p = buf; mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len, msg->topic.p); if (p == NULL) { return; } mg_mqtt_publish(s->nc, p, 0, 0, msg->payload.p, msg->payload.len); if (p != buf) { MG_FREE(p); } break; } } } } void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) { struct mg_mqtt_message *msg = (struct mg_mqtt_message *) data; struct mg_mqtt_broker *brk; if (nc->listener) { brk = (struct mg_mqtt_broker *) nc->listener->priv_2; } else { brk = (struct mg_mqtt_broker *) nc->priv_2; } switch (ev) { case MG_EV_ACCEPT: if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc); nc->priv_2 = NULL; /* Clear up the inherited pointer to broker */ break; case MG_EV_MQTT_CONNECT: if (nc->priv_2 == NULL) { mg_mqtt_broker_handle_connect(brk, nc); } else { /* Repeated CONNECT */ nc->flags |= MG_F_CLOSE_IMMEDIATELY; } break; case MG_EV_MQTT_SUBSCRIBE: if (nc->priv_2 != NULL) { mg_mqtt_broker_handle_subscribe(nc, msg); } else { /* Subscribe before CONNECT */ nc->flags |= MG_F_CLOSE_IMMEDIATELY; } break; case MG_EV_MQTT_PUBLISH: if (nc->priv_2 != NULL) { mg_mqtt_broker_handle_publish(brk, msg); } else { /* Publish before CONNECT */ nc->flags |= MG_F_CLOSE_IMMEDIATELY; } break; case MG_EV_CLOSE: if (nc->listener && nc->priv_2 != NULL) { mg_mqtt_close_session((struct mg_mqtt_session *) nc->priv_2); } break; } } struct mg_mqtt_session *mg_mqtt_next(struct mg_mqtt_broker *brk, struct mg_mqtt_session *s) { return s == NULL ? LIST_FIRST(&brk->sessions) : LIST_NEXT(s, link); } #endif /* MG_ENABLE_MQTT_BROKER */