Unverified Commit 3079784b authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #638 from capnproto/ignore-client-body

Extend HttpServer to inform the caller when a connection ends cleanly on drain().
parents 35bead7b 098004cd
......@@ -80,7 +80,7 @@ public:
uint64_t n = kj::min(limit - doneSoFar, sizeof(buffer));
if (n == 0) return doneSoFar;
return input.tryRead(buffer, 1, sizeof(buffer))
return input.tryRead(buffer, 1, n)
.then([this](size_t amount) -> Promise<uint64_t> {
if (amount == 0) return doneSoFar; // EOF
doneSoFar += amount;
......@@ -1704,10 +1704,6 @@ public:
} else if (url == "/ws-inline") {
auto ws = response.acceptWebSocket(responseHeaders);
return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
} else if (url == "/ws-detached") {
auto ws = response.acceptWebSocket(responseHeaders);
tasks.add(doWebSocket(*ws, "start-detached").attach(kj::mv(ws)));
return kj::READY_NOW;
} else {
KJ_FAIL_ASSERT("unexpected path", url);
......@@ -1767,8 +1763,6 @@ const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] =
{ 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' };
{ 0x81, 0x0e, 's','t','a','r','t','-','d','e','t','a','c','h','e','d' };
{ 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 };
......@@ -1909,31 +1903,6 @@ KJ_TEST("HttpServer WebSocket handshake") {
KJ_TEST("HttpServer WebSocket handshake detached") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(io.provider->getTimer(), *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /ws-detached", WEBSOCKET_REQUEST_HANDSHAKE);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_DETACHED).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope);
KJ_TEST("HttpServer WebSocket handshake error") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
......@@ -3255,13 +3255,13 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host)
class HttpServer::Connection final: private HttpService::Response {
Connection(HttpServer& server, kj::Own<kj::AsyncIoStream>&& stream,
Connection(HttpServer& server, kj::AsyncIoStream& stream,
HttpService& service)
: server(server),
httpInput(*stream, server.requestHeaderTable),
ownStream(kj::mv(stream)) {
httpInput(stream, server.requestHeaderTable),
httpOutput(stream) {
~Connection() noexcept(false) {
......@@ -3272,7 +3272,7 @@ public:
kj::Promise<void> loop(bool firstRequest) {
kj::Promise<bool> loop(bool firstRequest) {
auto firstByte = httpInput.awaitNextMessage();
if (!firstRequest) {
......@@ -3322,10 +3322,10 @@ public:
return receivedHeaders
.then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<void> {
.then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<bool> {
if (closed) {
// Client closed connection. Close our end too.
return httpOutput.flush();
return httpOutput.flush().then([]() { return false; });
if (timedOut) {
// Client took too long to send anything, so we're going to close the connection. In
......@@ -3342,7 +3342,7 @@ public:
// error in the case that the server is draining, which also sets timedOut = true; see
// above.
return httpOutput.flush();
return httpOutput.flush().then([this]() { return server.draining; });
KJ_IF_MAYBE(req, request) {
......@@ -3359,8 +3359,8 @@ public:
auto promise = service.request(
req->method, req->url, headers, *body, *this);
return promise.attach(kj::mv(body))
.then([this]() -> kj::Promise<void> {
return promise.then(kj::mvCapture(body,
[this](kj::Own<kj::AsyncInputStream> body) -> kj::Promise<bool> {
// Response done. Await next request.
KJ_IF_MAYBE(p, webSocketError) {
......@@ -3371,14 +3371,16 @@ public:
if (upgraded) {
// We've upgraded to WebSocket so we can exit this listen loop. In fact, we no longer
// own the stream.
// Note that the WebSocket itself also flush()es the httpOutput before writing any
// WebSocket content, but we should also make sure that we don't let the listen loop
// exit until that flush is done, since we can't destroy the HttpOutputStream in the
// meantime.
return httpOutput.flush();
// We've upgraded to WebSocket, and by now we should have closed the WebSocket.
if (!webSocketClosed) {
// This is gonna segfault later so abort now instead.
KJ_LOG(FATAL, "Accepted WebSocket object must be destroyed before HttpService "
"request handler completes.");
// Once we start a WebSocket there's no going back to HTTP.
return false;
if (currentMethod != nullptr) {
......@@ -3386,20 +3388,66 @@ public:
"ERROR: The HttpService did not generate a response."));
if (server.draining) {
// Never mind, drain time.
return httpOutput.flush();
return httpOutput.flush().then(kj::mvCapture(body,
[this](kj::Own<kj::AsyncInputStream> body) -> kj::Promise<bool> {
if (httpInput.canReuse()) {
// Things look clean. Go ahead and accept the next request.
// Note that we don't have to handle server.draining here because we'll take care of
// it the next time around the loop.
return loop(false);
} else {
// Apparently, the application did not read the request body. Maybe this is a bug,
// or maybe not: maybe the client tried to upload too much data and the application
// legitimately wants to cancel the upload without reading all it it.
// We have a problem, though: We did send a response, and we didn't send
// `Connection: close`, so the client may expect that it can send another request.
// Perhaps the client has even finished sending the previous request's body, in
// which case the moment it finishes receiving the response, it could be completely
// within its rights to start a new request. If we close the socket now, we might
// interrupt that new request.
// There's no way we can get out of this perfectly cleanly. HTTP just isn't good
// enough at connection management. The best we can do is give the client some grace
// period and then abort the connection.
auto dummy = kj::heap<HttpDiscardingEntityWriter>();
auto lengthGrace = body->pumpTo(*dummy, server.settings.canceledUploadGraceBytes)
.then([this](size_t amount) {
if (httpInput.canReuse()) {
// Success, we can continue.
return true;
} else {
// Still more data. Give up.
return false;
lengthGrace = lengthGrace.attach(kj::mv(dummy), kj::mv(body));
return httpOutput.flush().then([this]() { return loop(false); });
auto timeGrace = server.timer.afterDelay(server.settings.canceledUploadGacePeriod)
.then([]() { return false; });
return lengthGrace.exclusiveJoin(kj::mv(timeGrace))
.then([this](bool clean) -> kj::Promise<bool> {
if (clean) {
// We recovered. Continue loop.
return loop(false);
} else {
// Client still not done. Return broken.
return false;
} else {
// Bad request.
return sendError(400, "Bad Request", kj::str(
"ERROR: The headers sent by your client were not valid."));
}).catch_([this](kj::Exception&& e) -> kj::Promise<void> {
}).catch_([this](kj::Exception&& e) -> kj::Promise<bool> {
// Exception; report 500.
if (currentMethod == nullptr) {
......@@ -3419,7 +3467,7 @@ public:
KJ_LOG(ERROR, "HttpService threw exception after generating a partial response",
"too late to report error to client", e);
return kj::READY_NOW;
return false;
if (e.getType() == kj::Exception::Type::OVERLOADED) {
......@@ -3434,7 +3482,7 @@ public:
// again later, not now"). Here's an idea: Don't send any response; just close the
// connection, so that it looks like the connection between the HTTP client and server
// was dropped. A good client should treat this exactly the way we want.
return kj::READY_NOW;
return false;
} else {
return sendError(500, "Internal Server Error", kj::str(
"ERROR: The server threw an exception. Details:\n\n", e));
......@@ -3444,15 +3492,16 @@ public:
HttpServer& server;
kj::AsyncIoStream& stream;
HttpService& service;
HttpInputStream httpInput;
HttpOutputStream httpOutput;
kj::Own<kj::AsyncIoStream> ownStream;
kj::Maybe<HttpMethod> currentMethod;
bool timedOut = false;
bool closed = false;
bool upgraded = false;
kj::Maybe<kj::Promise<void>> webSocketError;
bool webSocketClosed = false;
kj::Maybe<kj::Promise<bool>> webSocketError;
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
......@@ -3516,8 +3565,6 @@ private:
return sendWebSocketError(400, "Bad Request", kj::str("ERROR: Missing Sec-WebSocket-Key"));
upgraded = true;
auto websocketAccept = generateWebSocketAccept(key);
kj::StringPtr connectionHeaders[WEBSOCKET_CONNECTION_HEADERS_COUNT];
......@@ -3528,10 +3575,18 @@ private:
101, "Switching Protocols", connectionHeaders));
return upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, nullptr);
upgraded = true;
// We need to give the WebSocket an Own<AsyncIoStream>, but we only have a reference. This is
// safe because the application is expected to drop the WebSocket object before returning
// from the request handler. For some extra safety, we check that webSocketClosed has been
// set true when the handler returns.
auto deferNoteClosed = kj::defer([this]() { webSocketClosed = true; });
kj::Own<kj::AsyncIoStream> ownStream(&stream, kj::NullDisposer::instance);
return upgradeToWebSocket(ownStream.attach(kj::mv(deferNoteClosed)),
httpInput, httpOutput, nullptr);
kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, kj::String body) {
kj::Promise<bool> sendError(uint statusCode, kj::StringPtr statusText, kj::String body) {
HttpHeaders failed(server.requestHeaderTable);
failed.set(HttpHeaderId::CONNECTION, "close");
failed.set(HttpHeaderId::CONTENT_LENGTH, kj::str(body.size()));
......@@ -3541,7 +3596,7 @@ private:
httpOutput.writeHeaders(failed.serializeResponse(statusCode, statusText));
return httpOutput.flush(); // loop ends after flush
return httpOutput.flush().then([]() { return false; }); // loop ends after flush
kj::Own<WebSocket> sendWebSocketError(
......@@ -3631,15 +3686,23 @@ kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) {
kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) {
auto promise = listenHttpCleanDrain(*connection).ignoreResult();
// eagerlyEvaluate() to maintain historical guarantee that this method eagerly closes the
// connection when done.
return promise.attach(kj::mv(connection)).eagerlyEvaluate(nullptr);
kj::Promise<bool> HttpServer::listenHttpCleanDrain(kj::AsyncIoStream& connection) {
kj::Own<Connection> obj;
KJ_SWITCH_ONEOF(service) {
KJ_CASE_ONEOF(ptr, HttpService*) {
obj = heap<Connection>(*this, kj::mv(connection), *ptr);
obj = heap<Connection>(*this, connection, *ptr);
KJ_CASE_ONEOF(func, HttpServiceFactory) {
auto srv = func(*connection);
obj = heap<Connection>(*this, kj::mv(connection), *srv);
auto srv = func(connection);
obj = heap<Connection>(*this, connection, *srv);
obj = obj.attach(kj::mv(srv));
......@@ -640,6 +640,14 @@ struct HttpServerSettings {
kj::Duration pipelineTimeout = 5 * kj::SECONDS;
// After one request/response completes, we'll wait up to this long for a pipelined request to
// arrive.
kj::Duration canceledUploadGacePeriod = 1 * kj::SECONDS;
size_t canceledUploadGraceBytes = 65536;
// If the HttpService sends a response and returns without having read the entire request body,
// then we have to decide whether to close the connection or wait for the client to finish the
// request so that it can pipeline the next one. We'll give them a grace period defined by the
// above two values -- if they hit either one, we'll close the socket, but if the request
// completes, we'll let the connection stay open to handle more requests.
class HttpServer: private kj::TaskSet::ErrorHandler {
......@@ -680,6 +688,14 @@ public:
// The promise throws if an unparseable request is received or if some I/O error occurs. Dropping
// the returned promise will cancel all I/O on the connection and cancel any in-flight requests.
kj::Promise<bool> listenHttpCleanDrain(kj::AsyncIoStream& connection);
// Like listenHttp(), but allows you to potentially drain the server without closing connections.
// The returned promise resolves to `true` if the connection has been left in a state where a
// new HttpServer could potentially accept further requests from it. If `false`, then the
// connection is either in an inconsistent state or already completed a closing handshake; the
// caller should close it without any further reads/writes. Note this only ever returns `true`
// if you called `drain()` -- otherwise this server would keep handling the connection.
class Connection;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment