Commit 502baf92 authored by Kenton Varda's avatar Kenton Varda

Fix regressions in HTTP drain.

parent badade33
......@@ -1051,19 +1051,7 @@ public:
});
}
// Slightly-crappy code to snarf the expected line break. This will actually eat the leading
// regex /\r*\n?/.
while (lineBreakBeforeNextHeader && leftover.size() > 0) {
if (leftover[0] == '\r') {
leftover = leftover.slice(1, leftover.size());
} else if (leftover[0] == '\n') {
leftover = leftover.slice(1, leftover.size());
lineBreakBeforeNextHeader = false;
} else {
// Err, missing line break, whatever.
lineBreakBeforeNextHeader = false;
}
}
snarfBufferedLineBreak();
if (!lineBreakBeforeNextHeader && leftover != nullptr) {
return true;
......@@ -1080,6 +1068,13 @@ public:
});
}
bool isCleanDrain() {
// Returns whether we can cleanly drain the stream at this point.
if (onMessageDone != nullptr) return false;
snarfBufferedLineBreak();
return !lineBreakBeforeNextHeader && leftover != nullptr;
}
kj::Promise<kj::ArrayPtr<char>> readMessageHeaders() {
++pendingMessageCount;
auto paf = kj::newPromiseAndFulfiller<void>();
......@@ -1362,6 +1357,22 @@ private:
}
});
}
void snarfBufferedLineBreak() {
// Slightly-crappy code to snarf the expected line break. This will actually eat the leading
// regex /\r*\n?/.
while (lineBreakBeforeNextHeader && leftover.size() > 0) {
if (leftover[0] == '\r') {
leftover = leftover.slice(1, leftover.size());
} else if (leftover[0] == '\n') {
leftover = leftover.slice(1, leftover.size());
lineBreakBeforeNextHeader = false;
} else {
// Err, missing line break, whatever.
lineBreakBeforeNextHeader = false;
}
}
}
};
// -----------------------------------------------------------------------------
......@@ -4064,18 +4075,29 @@ public:
}
kj::Promise<bool> loop(bool firstRequest) {
if (!firstRequest && server.draining && httpInput.isCleanDrain()) {
// Don't call awaitNextMessage() in this case because that will initiate a read() which will
// immediately be canceled, losing data.
return true;
}
auto firstByte = httpInput.awaitNextMessage();
if (!firstRequest) {
// For requests after the first, require that the first byte arrive before the pipeline
// timeout, otherwise treat it like the connection was simply closed.
auto timeoutPromise = server.timer.afterDelay(server.settings.pipelineTimeout)
.exclusiveJoin(server.onDrain.addBranch())
.then([this]() -> bool {
auto timeoutPromise = server.timer.afterDelay(server.settings.pipelineTimeout);
if (httpInput.isCleanDrain()) {
// If we haven't buffered any data, then we can safely drain here, so allow the wait to
// be canceled by the onDrain promise.
timeoutPromise = timeoutPromise.exclusiveJoin(server.onDrain.addBranch());
}
firstByte = firstByte.exclusiveJoin(timeoutPromise.then([this]() -> bool {
timedOut = true;
return false;
});
firstByte = firstByte.exclusiveJoin(kj::mv(timeoutPromise));
}));
}
auto receivedHeaders = firstByte
......@@ -4114,10 +4136,6 @@ public:
return receivedHeaders
.then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<bool> {
if (closed) {
// Client closed connection. Close our end too.
return httpOutput.flush().then([]() { return false; });
}
if (timedOut) {
// Client took too long to send anything, so we're going to close the connection. In
// theory, we should send back an HTTP 408 error -- it is designed exactly for this
......@@ -4133,7 +4151,14 @@ public:
// error in the case that the server is draining, which also sets timedOut = true; see
// above.
return httpOutput.flush().then([this]() { return server.draining; });
return httpOutput.flush().then([this]() {
return server.draining && httpInput.isCleanDrain();
});
}
if (closed) {
// Client closed connection. Close our end too.
return httpOutput.flush().then([]() { return false; });
}
KJ_IF_MAYBE(req, request) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment