Commit 674fe0c1 authored by Kenton Varda's avatar Kenton Varda

Proactively cancel HTTP service when client disconnects.

This prevents a hanging service from being a permanent memory leak -- as long as the client is still there waiting.

(TCP servers will need additionally to apply connection-level timeouts/keepalives to detect disappearing clients.)
parent 0b9f906e
......@@ -2558,6 +2558,64 @@ KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
"Hello, World!", text);
}
class HangingHttpService final: public HttpService {
// HttpService that hangs forever.
public:
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
kj::Promise<void> result = kj::NEVER_DONE;
++inFlight;
return result.attach(kj::defer([this]() {
if (--inFlight == 0) {
KJ_IF_MAYBE(f, onCancelFulfiller) {
f->get()->fulfill();
}
}
}));
}
kj::Promise<void> onCancel() {
auto paf = kj::newPromiseAndFulfiller<void>();
onCancelFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
uint inFlight = 0;
private:
kj::Maybe<kj::Exception> exception;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onCancelFulfiller;
};
KJ_TEST("HttpServer disconnects ") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
HangingHttpService service;
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
KJ_EXPECT(service.inFlight == 0);
static constexpr auto request = "GET / HTTP/1.1\r\n\r\n"_kj;
pipe.ends[1]->write(request.begin(), request.size()).wait(waitScope);
auto cancelPromise = service.onCancel();
KJ_EXPECT(!cancelPromise.poll(waitScope));
KJ_EXPECT(service.inFlight == 1);
// Disconnect client and verify server cancels.
pipe.ends[1] = nullptr;
KJ_ASSERT(cancelPromise.poll(waitScope));
KJ_EXPECT(service.inFlight == 0);
cancelPromise.wait(waitScope);
}
// -----------------------------------------------------------------------------
KJ_TEST("newHttpService from HttpClient") {
......
......@@ -4849,7 +4849,10 @@ kj::Promise<bool> HttpServer::listenHttpCleanDrain(kj::AsyncIoStream& connection
}
}
auto promise = obj->loop(true);
// Start reading requests and responding to them, but immediately cancel processing if the client
// disconnects.
auto promise = obj->loop(true)
.exclusiveJoin(connection.whenWriteDisconnected().then([]() {return false;}));
// Eagerly evaluate so that we drop the connection when the promise resolves, even if the caller
// doesn't eagerly evaluate.
......
......@@ -629,6 +629,9 @@ public:
//
// `url` and `headers` are invalidated on the first read from `requestBody` or when the returned
// promise resolves, whichever comes first.
//
// Request processing can be canceled by dropping the returned promise. HttpServer may do so if
// the client disconnects prematurely.
virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host);
// Handles CONNECT requests. Only relevant for proxy services. Default implementation throws
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment