Unverified Commit 4972582d authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #820 from capnproto/async-pipe-eof-fulfills-pump-from-promise

Fulfill AsyncPipe::tryPumpFrom() on EOF
parents 0067a542 547bb03b
...@@ -1375,6 +1375,51 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") { ...@@ -1375,6 +1375,51 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
pipe2.out = nullptr; pipe2.out = nullptr;
} }
KJ_TEST("Userland pipe EOF fulfills pumpFrom promise") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto writePromise = pipe.out->write("foobar", 6);
KJ_EXPECT(!writePromise.poll(ws));
auto pipe3 = newOneWayPipe();
auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out);
KJ_EXPECT(!pumpPromise2.poll(ws));
expectRead(*pipe3.in, "foobar").wait(ws);
writePromise.wait(ws);
KJ_EXPECT(!pumpPromise.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
KJ_EXPECT(!pumpPromise2.poll(ws));
pipe2.out = nullptr;
KJ_EXPECT(pumpPromise2.wait(ws) == 6);
}
KJ_TEST("Userland pipe tryPumpFrom to pumpTo for same amount fulfills simultaneously") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 6));
auto writePromise = pipe.out->write("foobar", 6);
KJ_EXPECT(!writePromise.poll(ws));
auto pipe3 = newOneWayPipe();
auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out, 6);
KJ_EXPECT(!pumpPromise2.poll(ws));
expectRead(*pipe3.in, "foobar").wait(ws);
writePromise.wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 6);
KJ_EXPECT(pumpPromise2.wait(ws) == 6);
}
constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14; constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing // AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
......
...@@ -532,23 +532,17 @@ private: ...@@ -532,23 +532,17 @@ private:
canceler.release(); canceler.release();
pumpedSoFar += actual; pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount); KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) { if (pumpedSoFar == amount || actual < n) {
fulfiller.fulfill(kj::cp(amount)); // Either we pumped all we wanted or we hit EOF.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this); pipe.endState(*this);
return pipe.pumpTo(output, amount2 - actual)
.then([actual](uint64_t actual2) { return actual + actual2; });
} }
KJ_ASSERT(actual <= amount2); // Completed entire pumpTo amount.
if (actual == amount2) { KJ_ASSERT(actual == amount2);
// Completed entire pumpTo amount. return amount2;
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return pipe.pumpTo(output, amount2 - actual);
}
})); }));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment