Commit 8d2ed06b authored by Kenton Varda's avatar Kenton Varda

Fix pipe bug where tryPumpFrom() behaved differently than pumpTo().

parent 3b85e1c1
......@@ -1289,5 +1289,25 @@ KJ_TEST("Userland pipe pumpTo less than write amount") {
writePromise.wait(ws);
}
KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foobar", 6);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
KJ_EXPECT(!pumpPromise.poll(ws));
pipe.out = nullptr;
pipe2.in = nullptr; // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
}
} // namespace
} // namespace kj
......@@ -522,7 +522,26 @@ private:
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
// The input might have reached EOF, but we haven't detected it yet because we haven't tried
// to read that far. If we had not optimized tryPumpFrom() and instead used the default
// pumpTo() implementation, then the input would not have called write() again once it
// reached EOF, and therefore the abortRead() on the other end would *not* propagate an
// exception! We need the same behavior here. To that end, we need to detect if we're at EOF
// by reading one last byte.
checkEofTask = kj::evalNow([&]() {
static char junk;
return input.tryRead(&junk, 1, 1).then([this](uint64_t n) {
if (n == 0) {
fulfiller.fulfill(kj::cp(pumpedSoFar));
} else {
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
}
}).eagerlyEvaluate([this](kj::Exception&& e) {
fulfiller.reject(kj::mv(e));
});
});
pipe.endState(*this);
pipe.abortRead();
}
......@@ -547,6 +566,7 @@ private:
uint64_t amount;
uint64_t pumpedSoFar = 0;
Canceler canceler;
kj::Promise<void> checkEofTask = nullptr;
};
class BlockedRead final: public AsyncIoStream {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment