Commit 1bf3ad24 authored by Deomid "rojer" Ryabkov's avatar Deomid "rojer" Ryabkov Committed by Cesanta Bot

MQTT ping fixes

 * Actually drop the connection when no response to ping arrives within the next interval.
 * Avoid sending immediate ping when wall time is adjusted, it's usually spurious.

PUBLISHED_FROM=8049280b58edfb94dd0fcb6a1e89ffefe69bcea1
parent 884b9a48
...@@ -48,11 +48,14 @@ signature: | ...@@ -48,11 +48,14 @@ signature: |
/* Flags that are settable by user */ /* Flags that are settable by user */
#define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */ #define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */
#define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */ #define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */
#define MG_F_WEBSOCKET_NO_DEFRAG (1 << 12) /* Websocket specific */
#define MG_F_DELETE_CHUNK (1 << 13) /* HTTP specific */ /* Flags for protocol handlers */
#define MG_F_PROTO_1 (1 << 12)
#define MG_F_PROTO_2 (1 << 13)
#define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */ #define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */
#define MG_F_USER_1 (1 << 20) /* Flags left for application */ /* Flags left for application */
#define MG_F_USER_1 (1 << 20)
#define MG_F_USER_2 (1 << 21) #define MG_F_USER_2 (1 << 21)
#define MG_F_USER_3 (1 << 22) #define MG_F_USER_3 (1 << 22)
#define MG_F_USER_4 (1 << 23) #define MG_F_USER_4 (1 << 23)
......
...@@ -3746,7 +3746,7 @@ static int mg_socket_if_udp_send(struct mg_connection *nc, const void *buf, ...@@ -3746,7 +3746,7 @@ static int mg_socket_if_udp_send(struct mg_connection *nc, const void *buf,
static int mg_socket_if_tcp_recv(struct mg_connection *nc, void *buf, static int mg_socket_if_tcp_recv(struct mg_connection *nc, void *buf,
size_t len) { size_t len) {
int n = (int) MG_RECV_FUNC(nc->sock, buf, len, 0); int n = (int) MG_RECV_FUNC(nc->sock, buf, len, MSG_DONTWAIT);
if (n == 0) { if (n == 0) {
/* Orderly shutdown of the socket, try flushing output. */ /* Orderly shutdown of the socket, try flushing output. */
nc->flags |= MG_F_SEND_AND_CLOSE; nc->flags |= MG_F_SEND_AND_CLOSE;
...@@ -3922,8 +3922,8 @@ void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, double now) { ...@@ -3922,8 +3922,8 @@ void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, double now) {
#if MG_ENABLE_BROADCAST #if MG_ENABLE_BROADCAST
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
struct ctl_msg ctl_msg; struct ctl_msg ctl_msg;
int len = int len = (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg),
(int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0); MSG_DONTWAIT);
size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0); size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
DBG(("read %d from ctl socket", len)); DBG(("read %d from ctl socket", len));
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
...@@ -5965,7 +5965,8 @@ static struct mg_str mg_get_mime_types_entry(struct mg_str path) { ...@@ -5965,7 +5965,8 @@ static struct mg_str mg_get_mime_types_entry(struct mg_str path) {
size_t i; size_t i;
for (i = 0; mg_static_builtin_mime_types[i].extension != NULL; i++) { for (i = 0; mg_static_builtin_mime_types[i].extension != NULL; i++) {
if (path.len < mg_static_builtin_mime_types[i].ext_len + 1) continue; if (path.len < mg_static_builtin_mime_types[i].ext_len + 1) continue;
struct mg_str ext = MG_MK_STR_N(mg_static_builtin_mime_types[i].extension, mg_static_builtin_mime_types[i].ext_len); struct mg_str ext = MG_MK_STR_N(mg_static_builtin_mime_types[i].extension,
mg_static_builtin_mime_types[i].ext_len);
struct mg_str pext = MG_MK_STR_N(path.p + (path.len - ext.len), ext.len); struct mg_str pext = MG_MK_STR_N(path.p + (path.len - ext.len), ext.len);
if (pext.p[-1] == '.' && mg_strcasecmp(ext, pext) == 0) { if (pext.p[-1] == '.' && mg_strcasecmp(ext, pext) == 0) {
return mg_mk_str(mg_static_builtin_mime_types[i].mime_type); return mg_mk_str(mg_static_builtin_mime_types[i].mime_type);
...@@ -5974,9 +5975,9 @@ static struct mg_str mg_get_mime_types_entry(struct mg_str path) { ...@@ -5974,9 +5975,9 @@ static struct mg_str mg_get_mime_types_entry(struct mg_str path) {
return mg_mk_str(NULL); return mg_mk_str(NULL);
} }
static int mg_get_mime_type_encoding(struct mg_str path, struct mg_str *type, MG_INTERNAL int mg_get_mime_type_encoding(
struct mg_str *encoding, struct mg_str path, struct mg_str *type, struct mg_str *encoding,
const struct mg_serve_http_opts *opts) { const struct mg_serve_http_opts *opts) {
const char *ext, *overrides; const char *ext, *overrides;
struct mg_str k, v; struct mg_str k, v;
...@@ -5995,7 +5996,8 @@ static int mg_get_mime_type_encoding(struct mg_str path, struct mg_str *type, ...@@ -5995,7 +5996,8 @@ static int mg_get_mime_type_encoding(struct mg_str path, struct mg_str *type,
if (mg_vcmp(type, "application/x-gunzip") == 0) { if (mg_vcmp(type, "application/x-gunzip") == 0) {
struct mg_str path2 = mg_mk_str_n(path.p, path.len - 3); struct mg_str path2 = mg_mk_str_n(path.p, path.len - 3);
struct mg_str type2 = mg_get_mime_types_entry(path2); struct mg_str type2 = mg_get_mime_types_entry(path2);
LOG(LL_ERROR, ("'%.*s' '%.*s' '%.*s'", (int) path.len, path.p, (int) path2.len, path2.p, (int) type2.len, type2.p)); LOG(LL_ERROR, ("'%.*s' '%.*s' '%.*s'", (int) path.len, path.p,
(int) path2.len, path2.p, (int) type2.len, type2.p));
if (type2.len > 0) { if (type2.len > 0) {
*type = type2; *type = type2;
*encoding = mg_mk_str("gzip"); *encoding = mg_mk_str("gzip");
...@@ -9477,7 +9479,8 @@ MG_INTERNAL void mg_handle_ssi_request(struct mg_connection *nc, ...@@ -9477,7 +9479,8 @@ MG_INTERNAL void mg_handle_ssi_request(struct mg_connection *nc,
} else { } else {
mg_set_close_on_exec((sock_t) fileno(fp)); mg_set_close_on_exec((sock_t) fileno(fp));
if (!mg_get_mime_type_encoding(mg_mk_str(path), &mime_type, &encoding, opts)) { if (!mg_get_mime_type_encoding(mg_mk_str(path), &mime_type, &encoding,
opts)) {
mime_type = mg_mk_str("text/plain"); mime_type = mg_mk_str("text/plain");
} }
mg_send_response_line(nc, 200, opts->extra_headers); mg_send_response_line(nc, 200, opts->extra_headers);
...@@ -9486,7 +9489,8 @@ MG_INTERNAL void mg_handle_ssi_request(struct mg_connection *nc, ...@@ -9486,7 +9489,8 @@ MG_INTERNAL void mg_handle_ssi_request(struct mg_connection *nc,
"Connection: close\r\n", "Connection: close\r\n",
(int) mime_type.len, mime_type.p); (int) mime_type.len, mime_type.p);
if (encoding.len > 0) { if (encoding.len > 0) {
mg_printf(nc, "Content-Encoding: %.*s\r\n", (int) encoding.len, encoding.p); mg_printf(nc, "Content-Encoding: %.*s\r\n", (int) encoding.len,
encoding.p);
} }
mg_send(nc, "\r\n", 2); mg_send(nc, "\r\n", 2);
mg_send_ssi_file(nc, hm, path, fp, 0, opts); mg_send_ssi_file(nc, hm, path, fp, 0, opts);
...@@ -10636,6 +10640,8 @@ struct mg_str mg_url_encode(const struct mg_str src) { ...@@ -10636,6 +10640,8 @@ struct mg_str mg_url_encode(const struct mg_str src) {
/* Amalgamated: #include "mg_internal.h" */ /* Amalgamated: #include "mg_internal.h" */
/* Amalgamated: #include "mg_mqtt.h" */ /* Amalgamated: #include "mg_mqtt.h" */
#define MG_F_MQTT_PING_PENDING MG_F_PROTO_1
static uint16_t getu16(const char *p) { static uint16_t getu16(const char *p) {
const uint8_t *up = (const uint8_t *) p; const uint8_t *up = (const uint8_t *) p;
return (up[0] << 8) + up[1]; return (up[0] << 8) + up[1];
...@@ -10798,6 +10804,10 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -10798,6 +10804,10 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
} }
break; break;
} }
if (mm.cmd == MG_MQTT_CMD_PINGRESP) {
LOG(LL_DEBUG, ("Recv PINGRESP"));
nc->flags &= ~MG_F_MQTT_PING_PENDING;
}
nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data)); nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data));
mbuf_remove(io, len); mbuf_remove(io, len);
...@@ -10808,10 +10818,26 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -10808,10 +10818,26 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
struct mg_mqtt_proto_data *pd = struct mg_mqtt_proto_data *pd =
(struct mg_mqtt_proto_data *) nc->proto_data; (struct mg_mqtt_proto_data *) nc->proto_data;
double now = mg_time(); double now = mg_time();
if (pd->keep_alive > 0 && pd->last_control_time > 0 && if (pd->keep_alive > 0 && pd->last_control_time > 0) {
(now - pd->last_control_time) > pd->keep_alive) { double diff = (now - pd->last_control_time);
LOG(LL_DEBUG, ("Send PINGREQ")); if (diff > pd->keep_alive) {
mg_mqtt_ping(nc); if (diff < 1500000000) {
if (!(nc->flags & MG_F_MQTT_PING_PENDING)) {
LOG(LL_DEBUG, ("Send PINGREQ"));
nc->flags |= MG_F_MQTT_PING_PENDING;
mg_mqtt_ping(nc);
} else {
LOG(LL_DEBUG, ("Ping timeout"));
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
} else {
/* Wall time has just been set. Avoid immediate ping,
* more likely than not it is not needed. The standard allows for
* 1.5X interval for ping requests, so even if were just about to
* send one, we should be ok waiting 0.4X more. */
pd->last_control_time = now - pd->keep_alive * 0.6;
}
}
} }
break; break;
} }
......
...@@ -3622,11 +3622,14 @@ struct mg_connection { ...@@ -3622,11 +3622,14 @@ struct mg_connection {
/* Flags that are settable by user */ /* Flags that are settable by user */
#define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */ #define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */
#define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */ #define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */
#define MG_F_WEBSOCKET_NO_DEFRAG (1 << 12) /* Websocket specific */
#define MG_F_DELETE_CHUNK (1 << 13) /* HTTP specific */ /* Flags for protocol handlers */
#define MG_F_PROTO_1 (1 << 12)
#define MG_F_PROTO_2 (1 << 13)
#define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */ #define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */
#define MG_F_USER_1 (1 << 20) /* Flags left for application */ /* Flags left for application */
#define MG_F_USER_1 (1 << 20)
#define MG_F_USER_2 (1 << 21) #define MG_F_USER_2 (1 << 21)
#define MG_F_USER_3 (1 << 22) #define MG_F_USER_3 (1 << 22)
#define MG_F_USER_4 (1 << 23) #define MG_F_USER_4 (1 << 23)
...@@ -4367,8 +4370,8 @@ struct mg_str mg_url_encode(const struct mg_str src); ...@@ -4367,8 +4370,8 @@ struct mg_str mg_url_encode(const struct mg_str src);
#if MG_ENABLE_HTTP #if MG_ENABLE_HTTP
/* Amalgamated: #include "mg_net.h" */
/* Amalgamated: #include "common/mg_str.h" */ /* Amalgamated: #include "common/mg_str.h" */
/* Amalgamated: #include "mg_net.h" */
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -4475,6 +4478,9 @@ struct mg_ssi_call_ctx { ...@@ -4475,6 +4478,9 @@ struct mg_ssi_call_ctx {
#define MG_EV_HTTP_MULTIPART_REQUEST_END 125 #define MG_EV_HTTP_MULTIPART_REQUEST_END 125
#endif #endif
#define MG_F_WEBSOCKET_NO_DEFRAG MG_F_PROTO_1
#define MG_F_DELETE_CHUNK MG_F_PROTO_2
/* /*
* Attaches a built-in HTTP event handler to the given connection. * Attaches a built-in HTTP event handler to the given connection.
* The user-defined event handler will receive following extra events: * The user-defined event handler will receive following extra events:
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
#if MG_ENABLE_HTTP #if MG_ENABLE_HTTP
#include "mg_net.h"
#include "common/mg_str.h" #include "common/mg_str.h"
#include "mg_net.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -120,6 +120,9 @@ struct mg_ssi_call_ctx { ...@@ -120,6 +120,9 @@ struct mg_ssi_call_ctx {
#define MG_EV_HTTP_MULTIPART_REQUEST_END 125 #define MG_EV_HTTP_MULTIPART_REQUEST_END 125
#endif #endif
#define MG_F_WEBSOCKET_NO_DEFRAG MG_F_PROTO_1
#define MG_F_DELETE_CHUNK MG_F_PROTO_2
/* /*
* Attaches a built-in HTTP event handler to the given connection. * Attaches a built-in HTTP event handler to the given connection.
* The user-defined event handler will receive following extra events: * The user-defined event handler will receive following extra events:
......
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
#include "mg_internal.h" #include "mg_internal.h"
#include "mg_mqtt.h" #include "mg_mqtt.h"
#define MG_F_MQTT_PING_PENDING MG_F_PROTO_1
static uint16_t getu16(const char *p) { static uint16_t getu16(const char *p) {
const uint8_t *up = (const uint8_t *) p; const uint8_t *up = (const uint8_t *) p;
return (up[0] << 8) + up[1]; return (up[0] << 8) + up[1];
...@@ -172,6 +174,10 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -172,6 +174,10 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
} }
break; break;
} }
if (mm.cmd == MG_MQTT_CMD_PINGRESP) {
LOG(LL_DEBUG, ("Recv PINGRESP"));
nc->flags &= ~MG_F_MQTT_PING_PENDING;
}
nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data)); nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm MG_UD_ARG(user_data));
mbuf_remove(io, len); mbuf_remove(io, len);
...@@ -182,10 +188,26 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -182,10 +188,26 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
struct mg_mqtt_proto_data *pd = struct mg_mqtt_proto_data *pd =
(struct mg_mqtt_proto_data *) nc->proto_data; (struct mg_mqtt_proto_data *) nc->proto_data;
double now = mg_time(); double now = mg_time();
if (pd->keep_alive > 0 && pd->last_control_time > 0 && if (pd->keep_alive > 0 && pd->last_control_time > 0) {
(now - pd->last_control_time) > pd->keep_alive) { double diff = (now - pd->last_control_time);
LOG(LL_DEBUG, ("Send PINGREQ")); if (diff > pd->keep_alive) {
mg_mqtt_ping(nc); if (diff < 1500000000) {
if (!(nc->flags & MG_F_MQTT_PING_PENDING)) {
LOG(LL_DEBUG, ("Send PINGREQ"));
nc->flags |= MG_F_MQTT_PING_PENDING;
mg_mqtt_ping(nc);
} else {
LOG(LL_DEBUG, ("Ping timeout"));
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
} else {
/* Wall time has just been set. Avoid immediate ping,
* more likely than not it is not needed. The standard allows for
* 1.5X interval for ping requests, so even if were just about to
* send one, we should be ok waiting 0.4X more. */
pd->last_control_time = now - pd->keep_alive * 0.6;
}
}
} }
break; break;
} }
......
...@@ -142,11 +142,14 @@ struct mg_connection { ...@@ -142,11 +142,14 @@ struct mg_connection {
/* Flags that are settable by user */ /* Flags that are settable by user */
#define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */ #define MG_F_SEND_AND_CLOSE (1 << 10) /* Push remaining data and close */
#define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */ #define MG_F_CLOSE_IMMEDIATELY (1 << 11) /* Disconnect */
#define MG_F_WEBSOCKET_NO_DEFRAG (1 << 12) /* Websocket specific */
#define MG_F_DELETE_CHUNK (1 << 13) /* HTTP specific */ /* Flags for protocol handlers */
#define MG_F_PROTO_1 (1 << 12)
#define MG_F_PROTO_2 (1 << 13)
#define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */ #define MG_F_ENABLE_BROADCAST (1 << 14) /* Allow broadcast address usage */
#define MG_F_USER_1 (1 << 20) /* Flags left for application */ /* Flags left for application */
#define MG_F_USER_1 (1 << 20)
#define MG_F_USER_2 (1 << 21) #define MG_F_USER_2 (1 << 21)
#define MG_F_USER_3 (1 << 22) #define MG_F_USER_3 (1 << 22)
#define MG_F_USER_4 (1 << 23) #define MG_F_USER_4 (1 << 23)
......
...@@ -103,7 +103,7 @@ static int mg_socket_if_udp_send(struct mg_connection *nc, const void *buf, ...@@ -103,7 +103,7 @@ static int mg_socket_if_udp_send(struct mg_connection *nc, const void *buf,
static int mg_socket_if_tcp_recv(struct mg_connection *nc, void *buf, static int mg_socket_if_tcp_recv(struct mg_connection *nc, void *buf,
size_t len) { size_t len) {
int n = (int) MG_RECV_FUNC(nc->sock, buf, len, 0); int n = (int) MG_RECV_FUNC(nc->sock, buf, len, MSG_DONTWAIT);
if (n == 0) { if (n == 0) {
/* Orderly shutdown of the socket, try flushing output. */ /* Orderly shutdown of the socket, try flushing output. */
nc->flags |= MG_F_SEND_AND_CLOSE; nc->flags |= MG_F_SEND_AND_CLOSE;
...@@ -279,8 +279,8 @@ void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, double now) { ...@@ -279,8 +279,8 @@ void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, double now) {
#if MG_ENABLE_BROADCAST #if MG_ENABLE_BROADCAST
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
struct ctl_msg ctl_msg; struct ctl_msg ctl_msg;
int len = int len = (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg),
(int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0); MSG_DONTWAIT);
size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0); size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
DBG(("read %d from ctl socket", len)); DBG(("read %d from ctl socket", len));
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment