Commit b0b82303 authored by Kenton Varda's avatar Kenton Varda

Fix some error propagation bugs in HTTP.

parent 2ec71986
......@@ -976,6 +976,10 @@ KJ_TEST("HttpClient canceled write") {
auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096));
// Note: This poll() forces the server to receive what was written so far. Otherwise,
// cancelling the write below may in fact cancel the previous write as well.
KJ_EXPECT(!serverPromise.poll(waitScope));
// Start a write and immediately cancel it.
{
auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size());
......
......@@ -1568,6 +1568,9 @@ private:
return inner.tryRead(buffer, minBytes, kj::min(maxBytes, chunkSize))
.then([=](size_t amount) -> size_t {
chunkSize -= amount;
if (amount < minBytes) {
kj::throwRecoverableException(KJ_EXCEPTION(DISCONNECTED, "premature EOF in HTTP chunk"));
}
return alreadyRead + amount;
});
}
......@@ -1744,6 +1747,8 @@ public:
KJ_REQUIRE(!writeInProgress, "concurrent write()s not allowed") { return kj::READY_NOW; }
KJ_REQUIRE(inBody) { return kj::READY_NOW; }
// TODO(soon): Queueing writes this way is flawed, because it means the caller cannot cancel
// the write.
writeInProgress = true;
auto fork = writeQueue.fork();
writeQueue = fork.addBranch();
......@@ -1791,6 +1796,16 @@ public:
KJ_REQUIRE(inBody) { return; }
inBody = false;
if (writeInProgress) {
// It looks like the last write never completed -- possibly because it was canceled or threw
// an exception. We must treat this equivalent to abortBody().
broken = true;
// Cancel any writes that are still queued.
writeQueue = KJ_EXCEPTION(FAILED,
"previous HTTP message body incomplete; can't write more messages");
}
}
void abortBody() {
......@@ -1799,10 +1814,9 @@ public:
inBody = false;
broken = true;
writeQueue = writeQueue.then([]() -> kj::Promise<void> {
return KJ_EXCEPTION(FAILED,
// Cancel any writes that are still queued.
writeQueue = KJ_EXCEPTION(FAILED,
"previous HTTP message body incomplete; can't write more messages");
});
}
kj::Promise<void> flush() {
......@@ -1815,6 +1829,8 @@ public:
return inner.whenWriteDisconnected();
}
bool isWriteInProgress() { return writeInProgress; }
private:
AsyncOutputStream& inner;
kj::Promise<void> writeQueue = kj::READY_NOW;
......@@ -1867,7 +1883,9 @@ public:
if (length == 0) inner.finishBody();
}
~HttpFixedLengthEntityWriter() noexcept(false) {
if (length > 0) inner.abortBody();
if (length > 0 || inner.isWriteInProgress()) {
inner.abortBody();
}
}
Promise<void> write(const void* buffer, size_t size) override {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment