Commit 098004cd authored by Kenton Varda's avatar Kenton Varda

Extend HttpServer to inform the caller when a connection ends cleanly on drain().

This allows an application which calls drain() to potentially pass off HTTP connections to a new HttpServer afterwards. Without this, the application has no way to know if the connections are in an indeterminant state.

This change also makes it OK for an application to fail to read the whole request body. Previously, if an app returned a response without reading the whole request, an exception would eventually be thrown, but potentially not until the client had initiated a new request on the same connection. The client would then get a spurious 500 error.
parent 18f7ff52
...@@ -1704,10 +1704,6 @@ public: ...@@ -1704,10 +1704,6 @@ public:
} else if (url == "/ws-inline") { } else if (url == "/ws-inline") {
auto ws = response.acceptWebSocket(responseHeaders); auto ws = response.acceptWebSocket(responseHeaders);
return doWebSocket(*ws, "start-inline").attach(kj::mv(ws)); return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
} else if (url == "/ws-detached") {
auto ws = response.acceptWebSocket(responseHeaders);
tasks.add(doWebSocket(*ws, "start-detached").attach(kj::mv(ws)));
return kj::READY_NOW;
} else { } else {
KJ_FAIL_ASSERT("unexpected path", url); KJ_FAIL_ASSERT("unexpected path", url);
} }
...@@ -1767,8 +1763,6 @@ const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] = ...@@ -1767,8 +1763,6 @@ const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] =
"\r\n"; "\r\n";
const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] = const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] =
{ 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' }; { 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' };
const byte WEBSOCKET_FIRST_MESSAGE_DETACHED[] =
{ 0x81, 0x0e, 's','t','a','r','t','-','d','e','t','a','c','h','e','d' };
const byte WEBSOCKET_SEND_MESSAGE[] = const byte WEBSOCKET_SEND_MESSAGE[] =
{ 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 }; { 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 };
const byte WEBSOCKET_REPLY_MESSAGE[] = const byte WEBSOCKET_REPLY_MESSAGE[] =
...@@ -1909,31 +1903,6 @@ KJ_TEST("HttpServer WebSocket handshake") { ...@@ -1909,31 +1903,6 @@ KJ_TEST("HttpServer WebSocket handshake") {
listenTask.wait(io.waitScope); listenTask.wait(io.waitScope);
} }
KJ_TEST("HttpServer WebSocket handshake detached") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(io.provider->getTimer(), *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /ws-detached", WEBSOCKET_REQUEST_HANDSHAKE);
pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
listenTask.wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_DETACHED).wait(io.waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope);
}
KJ_TEST("HttpServer WebSocket handshake error") { KJ_TEST("HttpServer WebSocket handshake error") {
auto io = kj::setupAsyncIo(); auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe(); auto pipe = io.provider->newTwoWayPipe();
......
...@@ -3255,13 +3255,13 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host) ...@@ -3255,13 +3255,13 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host)
class HttpServer::Connection final: private HttpService::Response { class HttpServer::Connection final: private HttpService::Response {
public: public:
Connection(HttpServer& server, kj::Own<kj::AsyncIoStream>&& stream, Connection(HttpServer& server, kj::AsyncIoStream& stream,
HttpService& service) HttpService& service)
: server(server), : server(server),
stream(stream),
service(service), service(service),
httpInput(*stream, server.requestHeaderTable), httpInput(stream, server.requestHeaderTable),
httpOutput(*stream), httpOutput(stream) {
ownStream(kj::mv(stream)) {
++server.connectionCount; ++server.connectionCount;
} }
~Connection() noexcept(false) { ~Connection() noexcept(false) {
...@@ -3272,7 +3272,7 @@ public: ...@@ -3272,7 +3272,7 @@ public:
} }
} }
kj::Promise<void> loop(bool firstRequest) { kj::Promise<bool> loop(bool firstRequest) {
auto firstByte = httpInput.awaitNextMessage(); auto firstByte = httpInput.awaitNextMessage();
if (!firstRequest) { if (!firstRequest) {
...@@ -3322,10 +3322,10 @@ public: ...@@ -3322,10 +3322,10 @@ public:
} }
return receivedHeaders return receivedHeaders
.then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<void> { .then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<bool> {
if (closed) { if (closed) {
// Client closed connection. Close our end too. // Client closed connection. Close our end too.
return httpOutput.flush(); return httpOutput.flush().then([]() { return false; });
} }
if (timedOut) { if (timedOut) {
// Client took too long to send anything, so we're going to close the connection. In // Client took too long to send anything, so we're going to close the connection. In
...@@ -3342,7 +3342,7 @@ public: ...@@ -3342,7 +3342,7 @@ public:
// error in the case that the server is draining, which also sets timedOut = true; see // error in the case that the server is draining, which also sets timedOut = true; see
// above. // above.
return httpOutput.flush(); return httpOutput.flush().then([this]() { return server.draining; });
} }
KJ_IF_MAYBE(req, request) { KJ_IF_MAYBE(req, request) {
...@@ -3359,8 +3359,8 @@ public: ...@@ -3359,8 +3359,8 @@ public:
auto promise = service.request( auto promise = service.request(
req->method, req->url, headers, *body, *this); req->method, req->url, headers, *body, *this);
return promise.attach(kj::mv(body)) return promise.then(kj::mvCapture(body,
.then([this]() -> kj::Promise<void> { [this](kj::Own<kj::AsyncInputStream> body) -> kj::Promise<bool> {
// Response done. Await next request. // Response done. Await next request.
KJ_IF_MAYBE(p, webSocketError) { KJ_IF_MAYBE(p, webSocketError) {
...@@ -3371,14 +3371,16 @@ public: ...@@ -3371,14 +3371,16 @@ public:
} }
if (upgraded) { if (upgraded) {
// We've upgraded to WebSocket so we can exit this listen loop. In fact, we no longer // We've upgraded to WebSocket, and by now we should have closed the WebSocket.
// own the stream. if (!webSocketClosed) {
// // This is gonna segfault later so abort now instead.
// Note that the WebSocket itself also flush()es the httpOutput before writing any KJ_LOG(FATAL, "Accepted WebSocket object must be destroyed before HttpService "
// WebSocket content, but we should also make sure that we don't let the listen loop "request handler completes.");
// exit until that flush is done, since we can't destroy the HttpOutputStream in the abort();
// meantime. }
return httpOutput.flush();
// Once we start a WebSocket there's no going back to HTTP.
return false;
} }
if (currentMethod != nullptr) { if (currentMethod != nullptr) {
...@@ -3386,20 +3388,66 @@ public: ...@@ -3386,20 +3388,66 @@ public:
"ERROR: The HttpService did not generate a response.")); "ERROR: The HttpService did not generate a response."));
} }
if (server.draining) { return httpOutput.flush().then(kj::mvCapture(body,
// Never mind, drain time. [this](kj::Own<kj::AsyncInputStream> body) -> kj::Promise<bool> {
return httpOutput.flush(); if (httpInput.canReuse()) {
// Things look clean. Go ahead and accept the next request.
// Note that we don't have to handle server.draining here because we'll take care of
// it the next time around the loop.
return loop(false);
} else {
// Apparently, the application did not read the request body. Maybe this is a bug,
// or maybe not: maybe the client tried to upload too much data and the application
// legitimately wants to cancel the upload without reading all it it.
//
// We have a problem, though: We did send a response, and we didn't send
// `Connection: close`, so the client may expect that it can send another request.
// Perhaps the client has even finished sending the previous request's body, in
// which case the moment it finishes receiving the response, it could be completely
// within its rights to start a new request. If we close the socket now, we might
// interrupt that new request.
//
// There's no way we can get out of this perfectly cleanly. HTTP just isn't good
// enough at connection management. The best we can do is give the client some grace
// period and then abort the connection.
auto dummy = kj::heap<HttpDiscardingEntityWriter>();
auto lengthGrace = body->pumpTo(*dummy, server.settings.canceledUploadGraceBytes)
.then([this](size_t amount) {
if (httpInput.canReuse()) {
// Success, we can continue.
return true;
} else {
// Still more data. Give up.
return false;
} }
});
lengthGrace = lengthGrace.attach(kj::mv(dummy), kj::mv(body));
return httpOutput.flush().then([this]() { return loop(false); }); auto timeGrace = server.timer.afterDelay(server.settings.canceledUploadGacePeriod)
.then([]() { return false; });
return lengthGrace.exclusiveJoin(kj::mv(timeGrace))
.then([this](bool clean) -> kj::Promise<bool> {
if (clean) {
// We recovered. Continue loop.
return loop(false);
} else {
// Client still not done. Return broken.
return false;
}
}); });
}
}));
}));
} else { } else {
// Bad request. // Bad request.
return sendError(400, "Bad Request", kj::str( return sendError(400, "Bad Request", kj::str(
"ERROR: The headers sent by your client were not valid.")); "ERROR: The headers sent by your client were not valid."));
} }
}).catch_([this](kj::Exception&& e) -> kj::Promise<void> { }).catch_([this](kj::Exception&& e) -> kj::Promise<bool> {
// Exception; report 500. // Exception; report 500.
if (currentMethod == nullptr) { if (currentMethod == nullptr) {
...@@ -3419,7 +3467,7 @@ public: ...@@ -3419,7 +3467,7 @@ public:
KJ_LOG(ERROR, "HttpService threw exception after generating a partial response", KJ_LOG(ERROR, "HttpService threw exception after generating a partial response",
"too late to report error to client", e); "too late to report error to client", e);
} }
return kj::READY_NOW; return false;
} }
if (e.getType() == kj::Exception::Type::OVERLOADED) { if (e.getType() == kj::Exception::Type::OVERLOADED) {
...@@ -3434,7 +3482,7 @@ public: ...@@ -3434,7 +3482,7 @@ public:
// again later, not now"). Here's an idea: Don't send any response; just close the // again later, not now"). Here's an idea: Don't send any response; just close the
// connection, so that it looks like the connection between the HTTP client and server // connection, so that it looks like the connection between the HTTP client and server
// was dropped. A good client should treat this exactly the way we want. // was dropped. A good client should treat this exactly the way we want.
return kj::READY_NOW; return false;
} else { } else {
return sendError(500, "Internal Server Error", kj::str( return sendError(500, "Internal Server Error", kj::str(
"ERROR: The server threw an exception. Details:\n\n", e)); "ERROR: The server threw an exception. Details:\n\n", e));
...@@ -3444,15 +3492,16 @@ public: ...@@ -3444,15 +3492,16 @@ public:
private: private:
HttpServer& server; HttpServer& server;
kj::AsyncIoStream& stream;
HttpService& service; HttpService& service;
HttpInputStream httpInput; HttpInputStream httpInput;
HttpOutputStream httpOutput; HttpOutputStream httpOutput;
kj::Own<kj::AsyncIoStream> ownStream;
kj::Maybe<HttpMethod> currentMethod; kj::Maybe<HttpMethod> currentMethod;
bool timedOut = false; bool timedOut = false;
bool closed = false; bool closed = false;
bool upgraded = false; bool upgraded = false;
kj::Maybe<kj::Promise<void>> webSocketError; bool webSocketClosed = false;
kj::Maybe<kj::Promise<bool>> webSocketError;
kj::Own<kj::AsyncOutputStream> send( kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers, uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
...@@ -3516,8 +3565,6 @@ private: ...@@ -3516,8 +3565,6 @@ private:
return sendWebSocketError(400, "Bad Request", kj::str("ERROR: Missing Sec-WebSocket-Key")); return sendWebSocketError(400, "Bad Request", kj::str("ERROR: Missing Sec-WebSocket-Key"));
} }
upgraded = true;
auto websocketAccept = generateWebSocketAccept(key); auto websocketAccept = generateWebSocketAccept(key);
kj::StringPtr connectionHeaders[WEBSOCKET_CONNECTION_HEADERS_COUNT]; kj::StringPtr connectionHeaders[WEBSOCKET_CONNECTION_HEADERS_COUNT];
...@@ -3528,10 +3575,18 @@ private: ...@@ -3528,10 +3575,18 @@ private:
httpOutput.writeHeaders(headers.serializeResponse( httpOutput.writeHeaders(headers.serializeResponse(
101, "Switching Protocols", connectionHeaders)); 101, "Switching Protocols", connectionHeaders));
return upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, nullptr); upgraded = true;
// We need to give the WebSocket an Own<AsyncIoStream>, but we only have a reference. This is
// safe because the application is expected to drop the WebSocket object before returning
// from the request handler. For some extra safety, we check that webSocketClosed has been
// set true when the handler returns.
auto deferNoteClosed = kj::defer([this]() { webSocketClosed = true; });
kj::Own<kj::AsyncIoStream> ownStream(&stream, kj::NullDisposer::instance);
return upgradeToWebSocket(ownStream.attach(kj::mv(deferNoteClosed)),
httpInput, httpOutput, nullptr);
} }
kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, kj::String body) { kj::Promise<bool> sendError(uint statusCode, kj::StringPtr statusText, kj::String body) {
HttpHeaders failed(server.requestHeaderTable); HttpHeaders failed(server.requestHeaderTable);
failed.set(HttpHeaderId::CONNECTION, "close"); failed.set(HttpHeaderId::CONNECTION, "close");
failed.set(HttpHeaderId::CONTENT_LENGTH, kj::str(body.size())); failed.set(HttpHeaderId::CONTENT_LENGTH, kj::str(body.size()));
...@@ -3541,7 +3596,7 @@ private: ...@@ -3541,7 +3596,7 @@ private:
httpOutput.writeHeaders(failed.serializeResponse(statusCode, statusText)); httpOutput.writeHeaders(failed.serializeResponse(statusCode, statusText));
httpOutput.writeBodyData(kj::mv(body)); httpOutput.writeBodyData(kj::mv(body));
httpOutput.finishBody(); httpOutput.finishBody();
return httpOutput.flush(); // loop ends after flush return httpOutput.flush().then([]() { return false; }); // loop ends after flush
} }
kj::Own<WebSocket> sendWebSocketError( kj::Own<WebSocket> sendWebSocketError(
...@@ -3631,15 +3686,23 @@ kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) { ...@@ -3631,15 +3686,23 @@ kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) {
} }
kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) { kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) {
auto promise = listenHttpCleanDrain(*connection).ignoreResult();
// eagerlyEvaluate() to maintain historical guarantee that this method eagerly closes the
// connection when done.
return promise.attach(kj::mv(connection)).eagerlyEvaluate(nullptr);
}
kj::Promise<bool> HttpServer::listenHttpCleanDrain(kj::AsyncIoStream& connection) {
kj::Own<Connection> obj; kj::Own<Connection> obj;
KJ_SWITCH_ONEOF(service) { KJ_SWITCH_ONEOF(service) {
KJ_CASE_ONEOF(ptr, HttpService*) { KJ_CASE_ONEOF(ptr, HttpService*) {
obj = heap<Connection>(*this, kj::mv(connection), *ptr); obj = heap<Connection>(*this, connection, *ptr);
} }
KJ_CASE_ONEOF(func, HttpServiceFactory) { KJ_CASE_ONEOF(func, HttpServiceFactory) {
auto srv = func(*connection); auto srv = func(connection);
obj = heap<Connection>(*this, kj::mv(connection), *srv); obj = heap<Connection>(*this, connection, *srv);
obj = obj.attach(kj::mv(srv)); obj = obj.attach(kj::mv(srv));
} }
} }
......
...@@ -640,6 +640,14 @@ struct HttpServerSettings { ...@@ -640,6 +640,14 @@ struct HttpServerSettings {
kj::Duration pipelineTimeout = 5 * kj::SECONDS; kj::Duration pipelineTimeout = 5 * kj::SECONDS;
// After one request/response completes, we'll wait up to this long for a pipelined request to // After one request/response completes, we'll wait up to this long for a pipelined request to
// arrive. // arrive.
kj::Duration canceledUploadGacePeriod = 1 * kj::SECONDS;
size_t canceledUploadGraceBytes = 65536;
// If the HttpService sends a response and returns without having read the entire request body,
// then we have to decide whether to close the connection or wait for the client to finish the
// request so that it can pipeline the next one. We'll give them a grace period defined by the
// above two values -- if they hit either one, we'll close the socket, but if the request
// completes, we'll let the connection stay open to handle more requests.
}; };
class HttpServer: private kj::TaskSet::ErrorHandler { class HttpServer: private kj::TaskSet::ErrorHandler {
...@@ -680,6 +688,14 @@ public: ...@@ -680,6 +688,14 @@ public:
// The promise throws if an unparseable request is received or if some I/O error occurs. Dropping // The promise throws if an unparseable request is received or if some I/O error occurs. Dropping
// the returned promise will cancel all I/O on the connection and cancel any in-flight requests. // the returned promise will cancel all I/O on the connection and cancel any in-flight requests.
kj::Promise<bool> listenHttpCleanDrain(kj::AsyncIoStream& connection);
// Like listenHttp(), but allows you to potentially drain the server without closing connections.
// The returned promise resolves to `true` if the connection has been left in a state where a
// new HttpServer could potentially accept further requests from it. If `false`, then the
// connection is either in an inconsistent state or already completed a closing handshake; the
// caller should close it without any further reads/writes. Note this only ever returns `true`
// if you called `drain()` -- otherwise this server would keep handling the connection.
private: private:
class Connection; class Connection;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment