Unverified Commit 40eb7779 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #869 from capnproto/fix-http-write-cancel

Fix some error propagation bugs in HTTP.
parents 2ec71986 b0b82303
...@@ -976,6 +976,10 @@ KJ_TEST("HttpClient canceled write") { ...@@ -976,6 +976,10 @@ KJ_TEST("HttpClient canceled write") {
auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096)); auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096));
// Note: This poll() forces the server to receive what was written so far. Otherwise,
// cancelling the write below may in fact cancel the previous write as well.
KJ_EXPECT(!serverPromise.poll(waitScope));
// Start a write and immediately cancel it. // Start a write and immediately cancel it.
{ {
auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size()); auto ignore KJ_UNUSED = req.body->write(body.begin(), body.size());
......
...@@ -1568,6 +1568,9 @@ private: ...@@ -1568,6 +1568,9 @@ private:
return inner.tryRead(buffer, minBytes, kj::min(maxBytes, chunkSize)) return inner.tryRead(buffer, minBytes, kj::min(maxBytes, chunkSize))
.then([=](size_t amount) -> size_t { .then([=](size_t amount) -> size_t {
chunkSize -= amount; chunkSize -= amount;
if (amount < minBytes) {
kj::throwRecoverableException(KJ_EXCEPTION(DISCONNECTED, "premature EOF in HTTP chunk"));
}
return alreadyRead + amount; return alreadyRead + amount;
}); });
} }
...@@ -1744,6 +1747,8 @@ public: ...@@ -1744,6 +1747,8 @@ public:
KJ_REQUIRE(!writeInProgress, "concurrent write()s not allowed") { return kj::READY_NOW; } KJ_REQUIRE(!writeInProgress, "concurrent write()s not allowed") { return kj::READY_NOW; }
KJ_REQUIRE(inBody) { return kj::READY_NOW; } KJ_REQUIRE(inBody) { return kj::READY_NOW; }
// TODO(soon): Queueing writes this way is flawed, because it means the caller cannot cancel
// the write.
writeInProgress = true; writeInProgress = true;
auto fork = writeQueue.fork(); auto fork = writeQueue.fork();
writeQueue = fork.addBranch(); writeQueue = fork.addBranch();
...@@ -1791,6 +1796,16 @@ public: ...@@ -1791,6 +1796,16 @@ public:
KJ_REQUIRE(inBody) { return; } KJ_REQUIRE(inBody) { return; }
inBody = false; inBody = false;
if (writeInProgress) {
// It looks like the last write never completed -- possibly because it was canceled or threw
// an exception. We must treat this equivalent to abortBody().
broken = true;
// Cancel any writes that are still queued.
writeQueue = KJ_EXCEPTION(FAILED,
"previous HTTP message body incomplete; can't write more messages");
}
} }
void abortBody() { void abortBody() {
...@@ -1799,10 +1814,9 @@ public: ...@@ -1799,10 +1814,9 @@ public:
inBody = false; inBody = false;
broken = true; broken = true;
writeQueue = writeQueue.then([]() -> kj::Promise<void> { // Cancel any writes that are still queued.
return KJ_EXCEPTION(FAILED, writeQueue = KJ_EXCEPTION(FAILED,
"previous HTTP message body incomplete; can't write more messages"); "previous HTTP message body incomplete; can't write more messages");
});
} }
kj::Promise<void> flush() { kj::Promise<void> flush() {
...@@ -1815,6 +1829,8 @@ public: ...@@ -1815,6 +1829,8 @@ public:
return inner.whenWriteDisconnected(); return inner.whenWriteDisconnected();
} }
bool isWriteInProgress() { return writeInProgress; }
private: private:
AsyncOutputStream& inner; AsyncOutputStream& inner;
kj::Promise<void> writeQueue = kj::READY_NOW; kj::Promise<void> writeQueue = kj::READY_NOW;
...@@ -1867,7 +1883,9 @@ public: ...@@ -1867,7 +1883,9 @@ public:
if (length == 0) inner.finishBody(); if (length == 0) inner.finishBody();
} }
~HttpFixedLengthEntityWriter() noexcept(false) { ~HttpFixedLengthEntityWriter() noexcept(false) {
if (length > 0) inner.abortBody(); if (length > 0 || inner.isWriteInProgress()) {
inner.abortBody();
}
} }
Promise<void> write(const void* buffer, size_t size) override { Promise<void> write(const void* buffer, size_t size) override {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment