Commit d1bd1529 authored by Sergey Lyubka's avatar Sergey Lyubka

request pipelining done

parent d00b7f43
...@@ -66,6 +66,9 @@ typedef __int64 int64_t; ...@@ -66,6 +66,9 @@ typedef __int64 int64_t;
#ifndef va_copy #ifndef va_copy
#define va_copy(x,y) x = y #define va_copy(x,y) x = y
#endif // MINGW #defines va_copy #endif // MINGW #defines va_copy
#ifndef __func__
#define __func__ ""
#endif
#else #else
#include <inttypes.h> #include <inttypes.h>
#include <unistd.h> #include <unistd.h>
...@@ -98,7 +101,7 @@ struct linked_list_link { struct linked_list_link *prev, *next; }; ...@@ -98,7 +101,7 @@ struct linked_list_link { struct linked_list_link *prev, *next; };
#define MAX_REQUEST_SIZE 16384 #define MAX_REQUEST_SIZE 16384
#define IOBUF_SIZE 8192 #define IOBUF_SIZE 8192
#define MAX_PATH_SIZE 8192 #define MAX_PATH_SIZE 8192
#define DBG(x) do { printf("%s:%d: ", __FILE__, __LINE__); \ #define DBG(x) do { printf("%s::%s() ", __FILE__, __func__); \
printf x; putchar('\n'); fflush(stdout); } while(0) printf x; putchar('\n'); fflush(stdout); } while(0)
//#define DBG(x) //#define DBG(x)
...@@ -131,6 +134,7 @@ struct mg_server { ...@@ -131,6 +134,7 @@ struct mg_server {
char *config_options[NUM_OPTIONS]; char *config_options[NUM_OPTIONS];
mg_event_handler_t event_handler; mg_event_handler_t event_handler;
void *user_data; void *user_data;
sock_t ctl[2]; // Control socketpair. Used to wake up from select() call
}; };
struct iobuf { struct iobuf {
...@@ -534,13 +538,13 @@ static int is_valid_http_method(const char *method) { ...@@ -534,13 +538,13 @@ static int is_valid_http_method(const char *method) {
// This function modifies the buffer by NUL-terminating // This function modifies the buffer by NUL-terminating
// HTTP request components, header names and header values. // HTTP request components, header names and header values.
static int parse_http_message(char *buf, int len, struct mg_request_info *ri) { static int parse_http_message(char *buf, int len, struct mg_request_info *ri) {
int is_request, request_length = get_request_len((unsigned char *) buf, len); int is_request, request_len = get_request_len((unsigned char *) buf, len);
if (request_length > 0) { if (request_len > 0) {
// Reset attributes. DO NOT TOUCH is_ssl, remote_ip, remote_port // Reset attributes. DO NOT TOUCH is_ssl, remote_ip, remote_port
ri->remote_user = ri->request_method = ri->uri = ri->http_version = NULL; ri->remote_user = ri->request_method = ri->uri = ri->http_version = NULL;
ri->num_headers = 0; ri->num_headers = 0;
buf[request_length - 1] = '\0'; buf[request_len - 1] = '\0';
// RFC says that all initial whitespaces should be ingored // RFC says that all initial whitespaces should be ingored
while (*buf != '\0' && isspace(* (unsigned char *) buf)) { while (*buf != '\0' && isspace(* (unsigned char *) buf)) {
...@@ -555,7 +559,7 @@ static int parse_http_message(char *buf, int len, struct mg_request_info *ri) { ...@@ -555,7 +559,7 @@ static int parse_http_message(char *buf, int len, struct mg_request_info *ri) {
is_request = is_valid_http_method(ri->request_method); is_request = is_valid_http_method(ri->request_method);
if ((is_request && memcmp(ri->http_version, "HTTP/", 5) != 0) || if ((is_request && memcmp(ri->http_version, "HTTP/", 5) != 0) ||
(!is_request && memcmp(ri->request_method, "HTTP/", 5) != 0)) { (!is_request && memcmp(ri->request_method, "HTTP/", 5) != 0)) {
request_length = -1; request_len = -1;
} else { } else {
if (is_request) { if (is_request) {
ri->http_version += 5; ri->http_version += 5;
...@@ -563,7 +567,7 @@ static int parse_http_message(char *buf, int len, struct mg_request_info *ri) { ...@@ -563,7 +567,7 @@ static int parse_http_message(char *buf, int len, struct mg_request_info *ri) {
parse_http_headers(&buf, ri); parse_http_headers(&buf, ri);
} }
} }
return request_length; return request_len;
} }
static int parse_range_header(const char *header, int64_t *a, int64_t *b) { static int parse_range_header(const char *header, int64_t *a, int64_t *b) {
...@@ -658,6 +662,21 @@ int mg_url_decode(const char *src, int src_len, char *dst, ...@@ -658,6 +662,21 @@ int mg_url_decode(const char *src, int src_len, char *dst,
return i >= src_len ? j : -1; return i >= src_len ? j : -1;
} }
// Return HTTP header value, or NULL if not found.
static const char *get_header(const struct mg_request_info *ri, const char *s) {
int i;
for (i = 0; i < ri->num_headers; i++)
if (!strcmp(s, ri->http_headers[i].name))
return ri->http_headers[i].value;
return NULL;
}
const char *mg_get_header(const struct mg_connection *conn, const char *name) {
return get_header(&conn->request_info, name);
}
// Return 1 if real file has been found, 0 otherwise // Return 1 if real file has been found, 0 otherwise
static int convert_uri_to_file_name(struct mg_connection *conn, char *buf, static int convert_uri_to_file_name(struct mg_connection *conn, char *buf,
size_t buf_len, struct stat *st) { size_t buf_len, struct stat *st) {
...@@ -721,7 +740,6 @@ static int vspool(struct iobuf *io, const char *fmt, va_list ap) { ...@@ -721,7 +740,6 @@ static int vspool(struct iobuf *io, const char *fmt, va_list ap) {
va_copy(ap_copy, ap); va_copy(ap_copy, ap);
len = vsnprintf(NULL, 0, fmt, ap_copy); len = vsnprintf(NULL, 0, fmt, ap_copy);
DBG(("len = %d", len));
if (len <= 0) { if (len <= 0) {
} else if (len < io->size - io->len || } else if (len < io->size - io->len ||
...@@ -753,7 +771,7 @@ static void write_to_client(struct mg_connection *conn) { ...@@ -753,7 +771,7 @@ static void write_to_client(struct mg_connection *conn) {
struct iobuf *io = &conn->remote_iobuf; struct iobuf *io = &conn->remote_iobuf;
int n = send(conn->client_sock, io->buf, io->len, 0); int n = send(conn->client_sock, io->buf, io->len, 0);
DBG(("Written %d of %d(%d): [%.*s]", n, io->len, io->size, 0, io->buf)); //DBG(("Written %d of %d(%d): [%.*s]", n, io->len, io->size, 0, io->buf));
if (is_error(n)) { if (is_error(n)) {
conn->flags |= CONN_CLOSE; conn->flags |= CONN_CLOSE;
...@@ -791,6 +809,29 @@ static void open_local_endpoint(struct mg_connection *conn) { ...@@ -791,6 +809,29 @@ static void open_local_endpoint(struct mg_connection *conn) {
} }
} }
static int io_space(const struct iobuf *io) {
return io->size - io->len;
}
static void process_request(struct mg_connection *conn) {
struct iobuf *io = &conn->local_iobuf;
DBG(("parse_http_message(%d [%.*s])", io->len, io->len, io->buf));
if (conn->request_len == 0) {
conn->request_len = parse_http_message(io->buf, io->len,
&conn->request_info);
}
DBG(("parse_http_message() -> %d", conn->request_len));
if (conn->request_len < 0 ||
(conn->request_len == 0 && io_space(io) <= 0)) {
// Invalid request, or request is too big: close the connection
conn->flags |= CONN_CLOSE;
} else if (conn->request_len > 0 && conn->endpoint_type == EP_NONE) {
open_local_endpoint(conn);
}
}
static void read_from_client(struct mg_connection *conn) { static void read_from_client(struct mg_connection *conn) {
struct iobuf *io = &conn->local_iobuf; struct iobuf *io = &conn->local_iobuf;
int n = recv(conn->client_sock, io->buf + io->len, io->size - io->len, 0); int n = recv(conn->client_sock, io->buf + io->len, io->size - io->len, 0);
...@@ -803,76 +844,90 @@ static void read_from_client(struct mg_connection *conn) { ...@@ -803,76 +844,90 @@ static void read_from_client(struct mg_connection *conn) {
conn->flags |= CONN_CLOSE; conn->flags |= CONN_CLOSE;
} else if (n > 0) { } else if (n > 0) {
io->len += n; io->len += n;
if (conn->request_len == 0) { process_request(conn);
conn->request_len = parse_http_message(io->buf, io->len,
&conn->request_info);
}
if (conn->request_len < 0 ||
(conn->request_len == 0 || io->len >= io->size)) {
conn->flags |= CONN_CLOSE;
} else if (conn->endpoint_type == EP_NONE) {
open_local_endpoint(conn);
}
} }
} }
void add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) { static int should_keep_alive(const struct mg_connection *conn) {
FD_SET(sock, set); const char *method = conn->request_info.request_method;
if (sock > *max_fd) { const char *http_version = conn->request_info.http_version;
*max_fd = sock; const char *header = mg_get_header(conn, "Connection");
} return method != NULL && !strcmp(method, "GET") &&
((header != NULL && !strcmp(header, "keep-alive")) ||
(header == NULL && http_version && !strcmp(http_version, "1.1")));
} }
static int io_space(const struct iobuf *io) { static void close_local_endpoint(struct mg_connection *conn) {
return io->size - io->len; struct iobuf *io = &conn->local_iobuf;
int keep_alive = should_keep_alive(conn); // Must be done before memmove!
// Close file descriptor
switch (conn->endpoint_type) {
case EP_FILE: close(conn->endpoint.fd); break;
default: assert(1); break;
}
// Get rid of that request from the buffer. NOTE: order is important here
assert(conn->request_len <= io->len);
memmove(io->buf, io->buf + conn->request_len, io->len - conn->request_len);
io->len -= conn->request_len;
conn->endpoint_type = EP_NONE;
conn->request_len = 0;
if (keep_alive) {
DBG(("keep alive!"));
process_request(conn); // Can call us recursively!
} else {
DBG(("closing!"));
conn->flags |= conn->remote_iobuf.len == 0 ? CONN_CLOSE : CONN_SPOOL_DONE;
}
} }
static void transfer_file_data(struct mg_connection *conn) { static void transfer_file_data(struct mg_connection *conn) {
struct iobuf *io = &conn->remote_iobuf, *iol = &conn->local_iobuf; struct iobuf *io = &conn->remote_iobuf;
int n = read(conn->endpoint.fd, io->buf + io->len, io->size - io->len); int n, rem_space = io_space(io);
//DBG(("%s: %d", __func__, n));
if (rem_space <= 0) return;
n = read(conn->endpoint.fd, io->buf + io->len, rem_space);
if (is_error(n)) { if (is_error(n)) {
close(conn->endpoint.fd); close_local_endpoint(conn);
conn->endpoint.fd = 0;
conn->endpoint_type = EP_NONE;
memmove(iol->buf, iol->buf + conn->request_len,
iol->len - conn->request_len);
iol->len -= conn->request_len;
conn->request_len = 0;
if (iol->len <= 0) {
conn->flags |= io->len > 0 ? CONN_SPOOL_DONE : CONN_CLOSE;
} else {
DBG(("%s", "more req!"));
}
//DBG(("%s: %s", __func__, "YEEEEEE"));
//conn->flags |= io->len > 0 ? CONN_SPOOL_DONE : CONN_CLOSE;
} else if (n > 0) { } else if (n > 0) {
io->len += n; io->len += n;
} }
} }
void add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) {
FD_SET(sock, set);
if (sock > *max_fd) {
*max_fd = sock;
}
}
void mg_poll_server(struct mg_server *server, unsigned int milliseconds) { void mg_poll_server(struct mg_server *server, unsigned int milliseconds) {
struct linked_list_link *lp, *tmp; struct linked_list_link *lp, *tmp;
struct mg_connection *conn; struct mg_connection *conn;
struct timeval tv; struct timeval tv;
fd_set read_set, write_set; fd_set read_set, write_set;
sock_t max_fd = -1; sock_t max_fd = -1;
time_t current_time = time(NULL), expire_time = current_time + 3; time_t current_time = time(NULL), expire_time = current_time +
atoi(server->config_options[IDLE_TIMEOUT_MS]) / 1000;
FD_ZERO(&read_set); FD_ZERO(&read_set);
FD_ZERO(&write_set); FD_ZERO(&write_set);
add_to_set(server->listening_sock, &read_set, &max_fd); add_to_set(server->listening_sock, &read_set, &max_fd);
add_to_set(server->ctl[0], &read_set, &max_fd);
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) { LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
conn = LINKED_LIST_ENTRY(lp, struct mg_connection, link); conn = LINKED_LIST_ENTRY(lp, struct mg_connection, link);
add_to_set(conn->client_sock, &read_set, &max_fd); add_to_set(conn->client_sock, &read_set, &max_fd);
if (conn->endpoint_type == EP_FILE && io_space(&conn->remote_iobuf) > 0) { if (conn->endpoint_type == EP_FILE) {
transfer_file_data(conn); transfer_file_data(conn);
// read can return 0, meaning EOF. Need to signal the writer to close.
add_to_set(conn->client_sock, &write_set, &max_fd);
} }
if (conn->remote_iobuf.len > 0) { if (conn->remote_iobuf.len > 0) {
add_to_set(conn->client_sock, &write_set, &max_fd); add_to_set(conn->client_sock, &write_set, &max_fd);
} else if (conn->flags & CONN_CLOSE) {
close_conn(conn);
} }
} }
...@@ -895,7 +950,6 @@ void mg_poll_server(struct mg_server *server, unsigned int milliseconds) { ...@@ -895,7 +950,6 @@ void mg_poll_server(struct mg_server *server, unsigned int milliseconds) {
read_from_client(conn); read_from_client(conn);
} }
if (FD_ISSET(conn->client_sock, &write_set)) { if (FD_ISSET(conn->client_sock, &write_set)) {
assert(conn->remote_iobuf.len >= 0);
conn->expire_time = expire_time; conn->expire_time = expire_time;
write_to_client(conn); write_to_client(conn);
} }
...@@ -942,6 +996,7 @@ struct mg_server *mg_create_server(const char *opts[], mg_event_handler_t func, ...@@ -942,6 +996,7 @@ struct mg_server *mg_create_server(const char *opts[], mg_event_handler_t func,
server->event_handler = func; server->event_handler = func;
server->user_data = user_data; server->user_data = user_data;
LINKED_LIST_INIT(&server->active_connections); LINKED_LIST_INIT(&server->active_connections);
mg_socketpair(server->ctl);
while (opts != NULL && (name = *opts++) != NULL) { while (opts != NULL && (name = *opts++) != NULL) {
if ((i = get_option_index(name)) == -1) { if ((i = get_option_index(name)) == -1) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment