Commit 22a76b40 authored by Kenton Varda's avatar Kenton Varda

Fix bug in `kj::newOneWayPipe(0)`.

parent 4972582d
...@@ -946,6 +946,32 @@ KJ_TEST("Userland pipe pumpTo with limit") { ...@@ -946,6 +946,32 @@ KJ_TEST("Userland pipe pumpTo with limit") {
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
} }
KJ_TEST("Userland pipe pump into zero-limited pipe, no data to pump") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe(uint64_t(0));
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
expectRead(*pipe2.in, "");
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 0);
}
KJ_TEST("Userland pipe pump into zero-limited pipe, data is pumped") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe(uint64_t(0));
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
expectRead(*pipe2.in, "");
auto writePromise = pipe.out->write("foo", 3);
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pumpPromise.wait(ws));
}
KJ_TEST("Userland pipe gather write") { KJ_TEST("Userland pipe gather write") {
kj::EventLoop loop; kj::EventLoop loop;
WaitScope ws(loop); WaitScope ws(loop);
......
...@@ -966,7 +966,27 @@ private: ...@@ -966,7 +966,27 @@ private:
KJ_FAIL_REQUIRE("abortRead() has been called"); KJ_FAIL_REQUIRE("abortRead() has been called");
} }
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override { Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called"); // There might not actually be any data in `input`, in which case a pump wouldn't actually
// write anything and wouldn't fail.
if (input.tryGetLength().orDefault(1) == 0) {
// Yeah a pump would pump nothing.
return Promise<uint64_t>(uint64_t(0));
} else {
// While we *could* just return nullptr here, it would probably then fall back to a normal
// buffered pump, which would allocate a big old buffer just to find there's nothing to
// read. Let's try reading 1 byte to avoid that allocation.
static char c;
return input.tryRead(&c, 1, 1).then([](size_t n) {
if (n == 0) {
// Yay, we're at EOF as hoped.
return uint64_t(0);
} else {
// There was data in the input. The pump would have thrown.
KJ_FAIL_REQUIRE("abortRead() has been called");
}
});
}
} }
void shutdownWrite() override { void shutdownWrite() override {
// ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped, // ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped,
...@@ -1112,7 +1132,7 @@ public: ...@@ -1112,7 +1132,7 @@ public:
LimitedInputStream(kj::Own<AsyncInputStream> inner, uint64_t limit) LimitedInputStream(kj::Own<AsyncInputStream> inner, uint64_t limit)
: inner(kj::mv(inner)), limit(limit) { : inner(kj::mv(inner)), limit(limit) {
if (limit == 0) { if (limit == 0) {
inner = nullptr; this->inner = nullptr;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment