Commit 53e1c469 authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by Cesanta Bot

Add ability for multipart data handler to provide pushback

It can specify how much data was actually processed and the rest will be re-delivered on next poll.

CL: mg: Add ability for multipart data handler to provide pushback

PUBLISHED_FROM=e0168c5064c3a32921c9209bc09f4da2079cd616
parent 139c2061
...@@ -9,6 +9,14 @@ signature: | ...@@ -9,6 +9,14 @@ signature: |
struct mg_str data; struct mg_str data;
int status; /* <0 on error */ int status; /* <0 on error */
void *user_data; void *user_data;
/*
* User handler can indicate how much of the data was consumed
* by setting this variable. By default, it is assumed that all
* data has been consumed by the handler.
* If not all data was consumed, user's handler will be invoked again later
* with the remainder.
*/
size_t num_data_consumed;
}; };
--- ---
......
...@@ -5901,6 +5901,7 @@ struct mg_http_multipart_stream { ...@@ -5901,6 +5901,7 @@ struct mg_http_multipart_stream {
void *user_data; void *user_data;
enum mg_http_multipart_stream_state state; enum mg_http_multipart_stream_state state;
int processing_part; int processing_part;
int data_avail;
}; };
struct mg_reverse_proxy_data { struct mg_reverse_proxy_data {
...@@ -6529,6 +6530,9 @@ void mg_http_handler(struct mg_connection *nc, int ev, ...@@ -6529,6 +6530,9 @@ void mg_http_handler(struct mg_connection *nc, int ev,
mg_http_call_endpoint_handler(nc, ev2, hm); mg_http_call_endpoint_handler(nc, ev2, hm);
} }
pd->rcvd = 0; pd->rcvd = 0;
if (pd->endpoint_handler != NULL && pd->endpoint_handler != nc->handler) {
mg_call(nc, pd->endpoint_handler, nc->user_data, ev, NULL);
}
} }
#if MG_ENABLE_FILESYSTEM #if MG_ENABLE_FILESYSTEM
...@@ -6539,17 +6543,24 @@ void mg_http_handler(struct mg_connection *nc, int ev, ...@@ -6539,17 +6543,24 @@ void mg_http_handler(struct mg_connection *nc, int ev,
mg_call(nc, nc->handler, nc->user_data, ev, ev_data); mg_call(nc, nc->handler, nc->user_data, ev, ev_data);
#if MG_ENABLE_HTTP_STREAMING_MULTIPART
if (pd->mp_stream.boundary != NULL &&
(ev == MG_EV_RECV || ev == MG_EV_POLL)) {
if (ev == MG_EV_RECV) { if (ev == MG_EV_RECV) {
struct mg_str *s;
pd->rcvd += *(int *) ev_data; pd->rcvd += *(int *) ev_data;
#if MG_ENABLE_HTTP_STREAMING_MULTIPART
if (pd->mp_stream.boundary != NULL) {
mg_http_multipart_continue(nc); mg_http_multipart_continue(nc);
} else if (pd->mp_stream.data_avail) {
/* Try re-delivering the data. */
mg_http_multipart_continue(nc);
}
return; return;
} }
#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ #endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */
if (ev == MG_EV_RECV) {
struct mg_str *s;
pd->rcvd += *(int *) ev_data;
again: again:
req_len = mg_parse_http(io->buf, io->len, hm, is_req); req_len = mg_parse_http(io->buf, io->len, hm, is_req);
...@@ -6744,8 +6755,9 @@ exit_mp: ...@@ -6744,8 +6755,9 @@ exit_mp:
#define CONTENT_DISPOSITION "Content-Disposition: " #define CONTENT_DISPOSITION "Content-Disposition: "
static void mg_http_multipart_call_handler(struct mg_connection *c, int ev, static size_t mg_http_multipart_call_handler(struct mg_connection *c, int ev,
const char *data, size_t data_len) { const char *data,
size_t data_len) {
struct mg_http_multipart_part mp; struct mg_http_multipart_part mp;
struct mg_http_proto_data *pd = mg_http_get_proto_data(c); struct mg_http_proto_data *pd = mg_http_get_proto_data(c);
memset(&mp, 0, sizeof(mp)); memset(&mp, 0, sizeof(mp));
...@@ -6755,8 +6767,11 @@ static void mg_http_multipart_call_handler(struct mg_connection *c, int ev, ...@@ -6755,8 +6767,11 @@ static void mg_http_multipart_call_handler(struct mg_connection *c, int ev,
mp.user_data = pd->mp_stream.user_data; mp.user_data = pd->mp_stream.user_data;
mp.data.p = data; mp.data.p = data;
mp.data.len = data_len; mp.data.len = data_len;
mp.num_data_consumed = data_len;
mg_call(c, pd->endpoint_handler, c->user_data, ev, &mp); mg_call(c, pd->endpoint_handler, c->user_data, ev, &mp);
pd->mp_stream.user_data = mp.user_data; pd->mp_stream.user_data = mp.user_data;
pd->mp_stream.data_avail = (mp.num_data_consumed != data_len);
return mp.num_data_consumed;
} }
static int mg_http_multipart_finalize(struct mg_connection *c) { static int mg_http_multipart_finalize(struct mg_connection *c) {
...@@ -6894,22 +6909,28 @@ static int mg_http_multipart_continue_wait_for_chunk(struct mg_connection *c) { ...@@ -6894,22 +6909,28 @@ static int mg_http_multipart_continue_wait_for_chunk(struct mg_connection *c) {
boundary = c_strnstr(io->buf, pd->mp_stream.boundary, io->len); boundary = c_strnstr(io->buf, pd->mp_stream.boundary, io->len);
if (boundary == NULL) { if (boundary == NULL) {
int data_size = (io->len - (pd->mp_stream.boundary_len + 6)); int data_len = (io->len - (pd->mp_stream.boundary_len + 6));
if (data_size > 0) { if (data_len > 0) {
mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA, io->buf, size_t consumed = mg_http_multipart_call_handler(
data_size); c, MG_EV_HTTP_PART_DATA, io->buf, (size_t) data_len);
mbuf_remove(io, data_size); mbuf_remove(io, consumed);
} }
return 0; return 0;
} else if (boundary != NULL) { } else if (boundary != NULL) {
int data_size = (boundary - io->buf - 4); size_t data_len = ((size_t)(boundary - io->buf) - 4);
mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA, io->buf, data_size); size_t consumed = mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA,
mbuf_remove(io, (boundary - io->buf)); io->buf, data_len);
mbuf_remove(io, consumed);
if (consumed == data_len) {
mbuf_remove(io, 4);
pd->mp_stream.state = MPS_WAITING_FOR_BOUNDARY; pd->mp_stream.state = MPS_WAITING_FOR_BOUNDARY;
return 1; return 1;
} else { } else {
return 0; return 0;
} }
} else {
return 0;
}
} }
static void mg_http_multipart_continue(struct mg_connection *c) { static void mg_http_multipart_continue(struct mg_connection *c) {
......
...@@ -4761,6 +4761,14 @@ struct mg_http_multipart_part { ...@@ -4761,6 +4761,14 @@ struct mg_http_multipart_part {
struct mg_str data; struct mg_str data;
int status; /* <0 on error */ int status; /* <0 on error */
void *user_data; void *user_data;
/*
* User handler can indicate how much of the data was consumed
* by setting this variable. By default, it is assumed that all
* data has been consumed by the handler.
* If not all data was consumed, user's handler will be invoked again later
* with the remainder.
*/
size_t num_data_consumed;
}; };
/* SSI call context */ /* SSI call context */
......
...@@ -140,6 +140,7 @@ struct mg_http_multipart_stream { ...@@ -140,6 +140,7 @@ struct mg_http_multipart_stream {
void *user_data; void *user_data;
enum mg_http_multipart_stream_state state; enum mg_http_multipart_stream_state state;
int processing_part; int processing_part;
int data_avail;
}; };
struct mg_reverse_proxy_data { struct mg_reverse_proxy_data {
...@@ -768,6 +769,9 @@ void mg_http_handler(struct mg_connection *nc, int ev, ...@@ -768,6 +769,9 @@ void mg_http_handler(struct mg_connection *nc, int ev,
mg_http_call_endpoint_handler(nc, ev2, hm); mg_http_call_endpoint_handler(nc, ev2, hm);
} }
pd->rcvd = 0; pd->rcvd = 0;
if (pd->endpoint_handler != NULL && pd->endpoint_handler != nc->handler) {
mg_call(nc, pd->endpoint_handler, nc->user_data, ev, NULL);
}
} }
#if MG_ENABLE_FILESYSTEM #if MG_ENABLE_FILESYSTEM
...@@ -778,17 +782,24 @@ void mg_http_handler(struct mg_connection *nc, int ev, ...@@ -778,17 +782,24 @@ void mg_http_handler(struct mg_connection *nc, int ev,
mg_call(nc, nc->handler, nc->user_data, ev, ev_data); mg_call(nc, nc->handler, nc->user_data, ev, ev_data);
#if MG_ENABLE_HTTP_STREAMING_MULTIPART
if (pd->mp_stream.boundary != NULL &&
(ev == MG_EV_RECV || ev == MG_EV_POLL)) {
if (ev == MG_EV_RECV) { if (ev == MG_EV_RECV) {
struct mg_str *s;
pd->rcvd += *(int *) ev_data; pd->rcvd += *(int *) ev_data;
#if MG_ENABLE_HTTP_STREAMING_MULTIPART
if (pd->mp_stream.boundary != NULL) {
mg_http_multipart_continue(nc); mg_http_multipart_continue(nc);
} else if (pd->mp_stream.data_avail) {
/* Try re-delivering the data. */
mg_http_multipart_continue(nc);
}
return; return;
} }
#endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */ #endif /* MG_ENABLE_HTTP_STREAMING_MULTIPART */
if (ev == MG_EV_RECV) {
struct mg_str *s;
pd->rcvd += *(int *) ev_data;
again: again:
req_len = mg_parse_http(io->buf, io->len, hm, is_req); req_len = mg_parse_http(io->buf, io->len, hm, is_req);
...@@ -983,8 +994,9 @@ exit_mp: ...@@ -983,8 +994,9 @@ exit_mp:
#define CONTENT_DISPOSITION "Content-Disposition: " #define CONTENT_DISPOSITION "Content-Disposition: "
static void mg_http_multipart_call_handler(struct mg_connection *c, int ev, static size_t mg_http_multipart_call_handler(struct mg_connection *c, int ev,
const char *data, size_t data_len) { const char *data,
size_t data_len) {
struct mg_http_multipart_part mp; struct mg_http_multipart_part mp;
struct mg_http_proto_data *pd = mg_http_get_proto_data(c); struct mg_http_proto_data *pd = mg_http_get_proto_data(c);
memset(&mp, 0, sizeof(mp)); memset(&mp, 0, sizeof(mp));
...@@ -994,8 +1006,11 @@ static void mg_http_multipart_call_handler(struct mg_connection *c, int ev, ...@@ -994,8 +1006,11 @@ static void mg_http_multipart_call_handler(struct mg_connection *c, int ev,
mp.user_data = pd->mp_stream.user_data; mp.user_data = pd->mp_stream.user_data;
mp.data.p = data; mp.data.p = data;
mp.data.len = data_len; mp.data.len = data_len;
mp.num_data_consumed = data_len;
mg_call(c, pd->endpoint_handler, c->user_data, ev, &mp); mg_call(c, pd->endpoint_handler, c->user_data, ev, &mp);
pd->mp_stream.user_data = mp.user_data; pd->mp_stream.user_data = mp.user_data;
pd->mp_stream.data_avail = (mp.num_data_consumed != data_len);
return mp.num_data_consumed;
} }
static int mg_http_multipart_finalize(struct mg_connection *c) { static int mg_http_multipart_finalize(struct mg_connection *c) {
...@@ -1133,22 +1148,28 @@ static int mg_http_multipart_continue_wait_for_chunk(struct mg_connection *c) { ...@@ -1133,22 +1148,28 @@ static int mg_http_multipart_continue_wait_for_chunk(struct mg_connection *c) {
boundary = c_strnstr(io->buf, pd->mp_stream.boundary, io->len); boundary = c_strnstr(io->buf, pd->mp_stream.boundary, io->len);
if (boundary == NULL) { if (boundary == NULL) {
int data_size = (io->len - (pd->mp_stream.boundary_len + 6)); int data_len = (io->len - (pd->mp_stream.boundary_len + 6));
if (data_size > 0) { if (data_len > 0) {
mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA, io->buf, size_t consumed = mg_http_multipart_call_handler(
data_size); c, MG_EV_HTTP_PART_DATA, io->buf, (size_t) data_len);
mbuf_remove(io, data_size); mbuf_remove(io, consumed);
} }
return 0; return 0;
} else if (boundary != NULL) { } else if (boundary != NULL) {
int data_size = (boundary - io->buf - 4); size_t data_len = ((size_t)(boundary - io->buf) - 4);
mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA, io->buf, data_size); size_t consumed = mg_http_multipart_call_handler(c, MG_EV_HTTP_PART_DATA,
mbuf_remove(io, (boundary - io->buf)); io->buf, data_len);
mbuf_remove(io, consumed);
if (consumed == data_len) {
mbuf_remove(io, 4);
pd->mp_stream.state = MPS_WAITING_FOR_BOUNDARY; pd->mp_stream.state = MPS_WAITING_FOR_BOUNDARY;
return 1; return 1;
} else { } else {
return 0; return 0;
} }
} else {
return 0;
}
} }
static void mg_http_multipart_continue(struct mg_connection *c) { static void mg_http_multipart_continue(struct mg_connection *c) {
......
...@@ -80,6 +80,14 @@ struct mg_http_multipart_part { ...@@ -80,6 +80,14 @@ struct mg_http_multipart_part {
struct mg_str data; struct mg_str data;
int status; /* <0 on error */ int status; /* <0 on error */
void *user_data; void *user_data;
/*
* User handler can indicate how much of the data was consumed
* by setting this variable. By default, it is assumed that all
* data has been consumed by the handler.
* If not all data was consumed, user's handler will be invoked again later
* with the remainder.
*/
size_t num_data_consumed;
}; };
/* SSI call context */ /* SSI call context */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment