Commit 547bb03b authored by Harris Hancock's avatar Harris Hancock

Fulfill AsyncPipe::tryPumpFrom() on EOF

Previously if the input stream being pumped from encountered an EOF while the other side of the pipe was being pumped elsewhere, we did not fulfill the tryPumpFrom() promise, leaving it hanging.
parent 0067a542
......@@ -1375,6 +1375,51 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
pipe2.out = nullptr;
}
KJ_TEST("Userland pipe EOF fulfills pumpFrom promise") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto writePromise = pipe.out->write("foobar", 6);
KJ_EXPECT(!writePromise.poll(ws));
auto pipe3 = newOneWayPipe();
auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out);
KJ_EXPECT(!pumpPromise2.poll(ws));
expectRead(*pipe3.in, "foobar").wait(ws);
writePromise.wait(ws);
KJ_EXPECT(!pumpPromise.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
KJ_EXPECT(!pumpPromise2.poll(ws));
pipe2.out = nullptr;
KJ_EXPECT(pumpPromise2.wait(ws) == 6);
}
KJ_TEST("Userland pipe tryPumpFrom to pumpTo for same amount fulfills simultaneously") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 6));
auto writePromise = pipe.out->write("foobar", 6);
KJ_EXPECT(!writePromise.poll(ws));
auto pipe3 = newOneWayPipe();
auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out, 6);
KJ_EXPECT(!pumpPromise2.poll(ws));
expectRead(*pipe3.in, "foobar").wait(ws);
writePromise.wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 6);
KJ_EXPECT(pumpPromise2.wait(ws) == 6);
}
constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
......
......@@ -532,23 +532,17 @@ private:
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
fulfiller.fulfill(kj::cp(amount));
if (pumpedSoFar == amount || actual < n) {
// Either we pumped all we wanted or we hit EOF.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
return pipe.pumpTo(output, amount2 - actual)
.then([actual](uint64_t actual2) { return actual + actual2; });
}
KJ_ASSERT(actual <= amount2);
if (actual == amount2) {
// Completed entire pumpTo amount.
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return pipe.pumpTo(output, amount2 - actual);
}
// Completed entire pumpTo amount.
KJ_ASSERT(actual == amount2);
return amount2;
}));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment