Commit dba7f35b authored by Kenton Varda's avatar Kenton Varda

Fix userland pipe bug that propagated EOF from a pump.

A pump does not propagate EOF. So a BlockedRead state should not complete when a pump happens that does not satisfy the read.
parent 71a8a674
......@@ -1702,6 +1702,36 @@ KJ_TEST("Userland pipe multi-part write doesn't quit early") {
writePromise.wait(ws);
}
KJ_TEST("Userland pipe BlockedRead gets empty tryPumpFrom") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
// First start a read from the back end.
char buffer[4];
auto readPromise = pipe2.in->tryRead(buffer, 1, 4);
// Now arrange a pump between the pipes, using tryPumpFrom().
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
// Disconnect the front pipe, causing EOF on the pump.
pipe.out = nullptr;
// The pump should have produced zero bytes.
KJ_EXPECT(pumpPromise.wait(ws) == 0);
// The read is incomplete.
KJ_EXPECT(!readPromise.poll(ws));
// A subsequent write() completes the read.
pipe2.out->write("foo", 3).wait(ws);
KJ_EXPECT(readPromise.wait(ws) == 3);
buffer[3] = '\0';
KJ_EXPECT(kj::StringPtr(buffer, 3) == "foo");
}
constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
......
......@@ -724,32 +724,33 @@ private:
auto maxToRead = kj::min(amount, readBuffer.size());
return canceler.wrap(input.tryRead(readBuffer.begin(), minToRead, maxToRead)
.then([this,&input,amount,minToRead](size_t actual) -> Promise<uint64_t> {
.then([this,&input,amount](size_t actual) -> Promise<uint64_t> {
readBuffer = readBuffer.slice(actual, readBuffer.size());
readSoFar += actual;
if (readSoFar >= minBytes || actual < minToRead) {
// We've read enough to close out this read (readSoFar >= minBytes)
// OR we reached EOF and couldn't complete the read (actual < minToRead)
// Either way, we want to close out this read.
if (readSoFar >= minBytes) {
// We've read enough to close out this read (readSoFar >= minBytes).
canceler.release();
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
if (actual < amount) {
// We din't complete pumping. Restart from the pipe.
// We didn't read as much data as the pump requested, but we did fulfill the read, so
// we don't know whether we reached EOF on the input. We need to continue the pump,
// replacing the BlockedRead state.
return input.pumpTo(pipe, amount - actual)
.then([actual](uint64_t actual2) -> uint64_t { return actual + actual2; });
} else {
// We pumped as much data as was requested, so we can return that now.
return actual;
}
} else {
// The pump completed without fulfilling the read. This either means that the pump
// reached EOF or the `amount` requested was not enough to satisfy the read in the first
// place. Pumps do not propagate EOF, so either way we want to leave the BlockedRead in
// place waiting for more data.
return actual;
}
// If we read less than `actual`, but more than `minToRead`, it can only have been
// because we reached `minBytes`, so the conditional above would have executed. So, here
// we know that actual == amount.
KJ_ASSERT(actual == amount);
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment