Commit 5b34c066 authored by Kenton Varda's avatar Kenton Varda

Add ability for HttpServer to call a callback to construct connection-specific HttpService objects.

This is particularly useful for implementing logic to grab the client's IP address and shove it in X-Real-IP.
parent 328e6f5b
...@@ -3251,8 +3251,10 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host) ...@@ -3251,8 +3251,10 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host)
class HttpServer::Connection final: private HttpService::WebSocketResponse { class HttpServer::Connection final: private HttpService::WebSocketResponse {
public: public:
Connection(HttpServer& server, kj::Own<kj::AsyncIoStream>&& stream) Connection(HttpServer& server, kj::Own<kj::AsyncIoStream>&& stream,
HttpService& service)
: server(server), : server(server),
service(service),
httpInput(*stream, server.requestHeaderTable), httpInput(*stream, server.requestHeaderTable),
httpOutput(*stream), httpOutput(*stream),
ownStream(kj::mv(stream)) { ownStream(kj::mv(stream)) {
...@@ -3358,7 +3360,7 @@ public: ...@@ -3358,7 +3360,7 @@ public:
KJ_IF_MAYBE(key, headers.get(HttpHeaderId::SEC_WEBSOCKET_KEY)) { KJ_IF_MAYBE(key, headers.get(HttpHeaderId::SEC_WEBSOCKET_KEY)) {
currentMethod = HttpMethod::GET; currentMethod = HttpMethod::GET;
websocketKey = kj::str(*key); websocketKey = kj::str(*key);
promise = server.service.openWebSocket(req->url, httpInput.getHeaders(), *this); promise = service.openWebSocket(req->url, httpInput.getHeaders(), *this);
} else { } else {
return sendError(400, "Bad Request", kj::str("ERROR: Missing Sec-WebSocket-Key")); return sendError(400, "Bad Request", kj::str("ERROR: Missing Sec-WebSocket-Key"));
} }
...@@ -3373,7 +3375,7 @@ public: ...@@ -3373,7 +3375,7 @@ public:
// be able to shutdown the upstream but still wait on the downstream, but I believe many // be able to shutdown the upstream but still wait on the downstream, but I believe many
// other HTTP servers do similar things. // other HTTP servers do similar things.
promise = server.service.request( promise = service.request(
req->method, req->url, headers, *body, *this); req->method, req->url, headers, *body, *this);
promise = promise.attach(kj::mv(body)); promise = promise.attach(kj::mv(body));
} }
...@@ -3448,6 +3450,7 @@ public: ...@@ -3448,6 +3450,7 @@ public:
private: private:
HttpServer& server; HttpServer& server;
HttpService& service;
HttpInputStream httpInput; HttpInputStream httpInput;
HttpOutputStream httpOutput; HttpOutputStream httpOutput;
kj::Own<kj::AsyncIoStream> ownStream; kj::Own<kj::AsyncIoStream> ownStream;
...@@ -3534,13 +3537,20 @@ private: ...@@ -3534,13 +3537,20 @@ private:
HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service, HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings) Settings settings)
: HttpServer(timer, requestHeaderTable, service, settings, : HttpServer(timer, requestHeaderTable, &service, settings,
kj::newPromiseAndFulfiller<void>()) {} kj::newPromiseAndFulfiller<void>()) {}
HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service, HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable,
HttpServiceFactory serviceFactory, Settings settings)
: HttpServer(timer, requestHeaderTable, kj::mv(serviceFactory), settings,
kj::newPromiseAndFulfiller<void>()) {}
HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable,
kj::OneOf<HttpService*, HttpServiceFactory> service,
Settings settings, kj::PromiseFulfillerPair<void> paf) Settings settings, kj::PromiseFulfillerPair<void> paf)
: timer(timer), requestHeaderTable(requestHeaderTable), service(service), settings(settings), : timer(timer), requestHeaderTable(requestHeaderTable), service(kj::mv(service)),
onDrain(paf.promise.fork()), drainFulfiller(kj::mv(paf.fulfiller)), tasks(*this) {} settings(settings), onDrain(paf.promise.fork()), drainFulfiller(kj::mv(paf.fulfiller)),
tasks(*this) {}
kj::Promise<void> HttpServer::drain() { kj::Promise<void> HttpServer::drain() {
KJ_REQUIRE(!draining, "you can only call drain() once"); KJ_REQUIRE(!draining, "you can only call drain() once");
...@@ -3575,7 +3585,19 @@ kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) { ...@@ -3575,7 +3585,19 @@ kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) {
} }
kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) { kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) {
auto obj = heap<Connection>(*this, kj::mv(connection)); kj::Own<Connection> obj;
KJ_SWITCH_ONEOF(service) {
KJ_CASE_ONEOF(ptr, HttpService*) {
obj = heap<Connection>(*this, kj::mv(connection), *ptr);
}
KJ_CASE_ONEOF(func, HttpServiceFactory) {
auto srv = func(*connection);
obj = heap<Connection>(*this, kj::mv(connection), *srv);
obj = obj.attach(kj::mv(srv));
}
}
auto promise = obj->loop(true); auto promise = obj->loop(true);
// Eagerly evaluate so that we drop the connection when the promise resolves, even if the caller // Eagerly evaluate so that we drop the connection when the promise resolves, even if the caller
......
...@@ -652,6 +652,7 @@ class HttpServer: private kj::TaskSet::ErrorHandler { ...@@ -652,6 +652,7 @@ class HttpServer: private kj::TaskSet::ErrorHandler {
public: public:
typedef HttpServerSettings Settings; typedef HttpServerSettings Settings;
typedef kj::Function<kj::Own<HttpService>(kj::AsyncIoStream&)> HttpServiceFactory;
HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service, HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings = Settings()); Settings settings = Settings());
...@@ -659,6 +660,12 @@ public: ...@@ -659,6 +660,12 @@ public:
// may be a host service or a proxy service depending on whether you are intending to implement // may be a host service or a proxy service depending on whether you are intending to implement
// an HTTP server or an HTTP proxy. // an HTTP server or an HTTP proxy.
HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable,
HttpServiceFactory serviceFactory, Settings settings = Settings());
// Like the other constructor, but allows a new HttpService object to be used for each
// connection, based on the connection object. This is particularly useful for capturing the
// client's IP address and injecting it as a header.
kj::Promise<void> drain(); kj::Promise<void> drain();
// Stop accepting new connections or new requests on existing connections. Finish any requests // Stop accepting new connections or new requests on existing connections. Finish any requests
// that are already executing, then close the connections. Returns once no more requests are // that are already executing, then close the connections. Returns once no more requests are
...@@ -683,7 +690,7 @@ private: ...@@ -683,7 +690,7 @@ private:
kj::Timer& timer; kj::Timer& timer;
HttpHeaderTable& requestHeaderTable; HttpHeaderTable& requestHeaderTable;
HttpService& service; kj::OneOf<HttpService*, HttpServiceFactory> service;
Settings settings; Settings settings;
bool draining = false; bool draining = false;
...@@ -695,7 +702,8 @@ private: ...@@ -695,7 +702,8 @@ private:
kj::TaskSet tasks; kj::TaskSet tasks;
HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service, HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable,
kj::OneOf<HttpService*, HttpServiceFactory> service,
Settings settings, kj::PromiseFulfillerPair<void> paf); Settings settings, kj::PromiseFulfillerPair<void> paf);
kj::Promise<void> listenLoop(kj::ConnectionReceiver& port); kj::Promise<void> listenLoop(kj::ConnectionReceiver& port);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment