Commit aa2ff674 authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by rojer

Call recved when data is consumed from recv_mbuf

    PUBLISHED_FROM=5fb212ed3114c57182781d441b6886cf83100a76
parent de24bfdf
...@@ -79,7 +79,6 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, ...@@ -79,7 +79,6 @@ MG_INTERNAL void mg_call(struct mg_connection *nc,
void mg_forward(struct mg_connection *from, struct mg_connection *to); void mg_forward(struct mg_connection *from, struct mg_connection *to);
MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c); MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c);
MG_INTERNAL void mg_remove_conn(struct mg_connection *c); MG_INTERNAL void mg_remove_conn(struct mg_connection *c);
MG_INTERNAL size_t recv_avail_size(struct mg_connection *conn, size_t max);
MG_INTERNAL struct mg_connection *mg_create_connection( MG_INTERNAL struct mg_connection *mg_create_connection(
struct mg_mgr *mgr, mg_event_handler_t callback, struct mg_mgr *mgr, mg_event_handler_t callback,
struct mg_add_sock_opts opts); struct mg_add_sock_opts opts);
...@@ -2047,7 +2046,6 @@ MG_INTERNAL void mg_remove_conn(struct mg_connection *conn) { ...@@ -2047,7 +2046,6 @@ MG_INTERNAL void mg_remove_conn(struct mg_connection *conn) {
MG_INTERNAL void mg_call(struct mg_connection *nc, MG_INTERNAL void mg_call(struct mg_connection *nc,
mg_event_handler_t ev_handler, int ev, void *ev_data) { mg_event_handler_t ev_handler, int ev, void *ev_data) {
unsigned long flags_before;
if (ev_handler == NULL) { if (ev_handler == NULL) {
/* /*
* If protocol handler is specified, call it. Otherwise, call user-specified * If protocol handler is specified, call it. Otherwise, call user-specified
...@@ -2073,13 +2071,18 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, ...@@ -2073,13 +2071,18 @@ MG_INTERNAL void mg_call(struct mg_connection *nc,
/* LCOV_EXCL_STOP */ /* LCOV_EXCL_STOP */
#endif #endif
if (ev_handler != NULL) { if (ev_handler != NULL) {
flags_before = nc->flags; unsigned long flags_before = nc->flags;
size_t recv_mbuf_before = nc->recv_mbuf.len, recved;
ev_handler(nc, ev, ev_data); ev_handler(nc, ev, ev_data);
recved = (recv_mbuf_before - nc->recv_mbuf.len);
/* Prevent user handler from fiddling with system flags. */ /* Prevent user handler from fiddling with system flags. */
if (ev_handler == nc->handler && nc->flags != flags_before) { if (ev_handler == nc->handler && nc->flags != flags_before) {
nc->flags = (flags_before & ~_MG_CALLBACK_MODIFIABLE_FLAGS_MASK) | nc->flags = (flags_before & ~_MG_CALLBACK_MODIFIABLE_FLAGS_MASK) |
(nc->flags & _MG_CALLBACK_MODIFIABLE_FLAGS_MASK); (nc->flags & _MG_CALLBACK_MODIFIABLE_FLAGS_MASK);
} }
if (recved > 0 && !(nc->flags & MG_F_UDP)) {
mg_if_recved(nc, recved);
}
} }
DBG(("%p after %s flags=%lu rmbl=%d smbl=%d", nc, DBG(("%p after %s flags=%lu rmbl=%d smbl=%d", nc,
ev_handler == nc->handler ? "user" : "proto", nc->flags, ev_handler == nc->handler ? "user" : "proto", nc->flags,
...@@ -2566,13 +2569,6 @@ struct mg_connection *mg_if_accept_tcp_cb(struct mg_connection *lc, ...@@ -2566,13 +2569,6 @@ struct mg_connection *mg_if_accept_tcp_cb(struct mg_connection *lc,
return nc; return nc;
} }
MG_INTERNAL size_t recv_avail_size(struct mg_connection *conn, size_t max) {
size_t avail;
if (conn->recv_mbuf_limit < conn->recv_mbuf.len) return 0;
avail = conn->recv_mbuf_limit - conn->recv_mbuf.len;
return avail > max ? max : avail;
}
void mg_send(struct mg_connection *nc, const void *buf, int len) { void mg_send(struct mg_connection *nc, const void *buf, int len) {
nc->last_io_time = mg_time(); nc->last_io_time = mg_time();
if (nc->flags & MG_F_UDP) { if (nc->flags & MG_F_UDP) {
...@@ -2612,8 +2608,6 @@ static void mg_recv_common(struct mg_connection *nc, void *buf, int len) { ...@@ -2612,8 +2608,6 @@ static void mg_recv_common(struct mg_connection *nc, void *buf, int len) {
nc->recv_mbuf.buf = (char *) buf; nc->recv_mbuf.buf = (char *) buf;
nc->recv_mbuf.size = nc->recv_mbuf.len = len; nc->recv_mbuf.size = nc->recv_mbuf.len = len;
} else { } else {
size_t avail = recv_avail_size(nc, len);
len = avail;
mbuf_append(&nc->recv_mbuf, buf, len); mbuf_append(&nc->recv_mbuf, buf, len);
MG_FREE(buf); MG_FREE(buf);
} }
...@@ -2622,7 +2616,6 @@ static void mg_recv_common(struct mg_connection *nc, void *buf, int len) { ...@@ -2622,7 +2616,6 @@ static void mg_recv_common(struct mg_connection *nc, void *buf, int len) {
void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len) { void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len) {
mg_recv_common(nc, buf, len); mg_recv_common(nc, buf, len);
mg_if_recved(nc, len);
} }
void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len, void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len,
...@@ -2663,8 +2656,8 @@ void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len, ...@@ -2663,8 +2656,8 @@ void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len,
} else { } else {
/* Drop on the floor. */ /* Drop on the floor. */
MG_FREE(buf); MG_FREE(buf);
mg_if_recved(nc, len);
} }
mg_if_recved(nc, len);
} }
/* /*
...@@ -3211,6 +3204,13 @@ static void mg_write_to_socket(struct mg_connection *nc) { ...@@ -3211,6 +3204,13 @@ static void mg_write_to_socket(struct mg_connection *nc) {
mg_if_sent_cb(nc, n); mg_if_sent_cb(nc, n);
} }
MG_INTERNAL size_t recv_avail_size(struct mg_connection *conn, size_t max) {
size_t avail;
if (conn->recv_mbuf_limit < conn->recv_mbuf.len) return 0;
avail = conn->recv_mbuf_limit - conn->recv_mbuf.len;
return avail > max ? max : avail;
}
static void mg_read_from_socket(struct mg_connection *conn) { static void mg_read_from_socket(struct mg_connection *conn) {
int n = 0; int n = 0;
char *buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE); char *buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE);
......
...@@ -1469,7 +1469,6 @@ void mg_if_sent_cb(struct mg_connection *nc, int num_sent); ...@@ -1469,7 +1469,6 @@ void mg_if_sent_cb(struct mg_connection *nc, int num_sent);
* Receive callback. * Receive callback.
* buf must be heap-allocated and ownership is transferred to the core. * buf must be heap-allocated and ownership is transferred to the core.
* Core will acknowledge consumption by calling mg_if_recved. * Core will acknowledge consumption by calling mg_if_recved.
* No more than one chunk of data can be unacknowledged at any time.
*/ */
void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len); void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len);
void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len, void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment