Commit ff34f25b authored by Kenton Varda's avatar Kenton Varda

Address Harris' review comments.

parent 277803fb
...@@ -806,7 +806,7 @@ KJ_TEST("Userland pipe pumpTo cancel") { ...@@ -806,7 +806,7 @@ KJ_TEST("Userland pipe pumpTo cancel") {
expectRead(*pipe2.in, "baz").wait(ws); expectRead(*pipe2.in, "baz").wait(ws);
} }
KJ_TEST("Userland pipe pumpTo cancel") { KJ_TEST("Userland pipe tryPumpFrom cancel") {
kj::EventLoop loop; kj::EventLoop loop;
WaitScope ws(loop); WaitScope ws(loop);
...@@ -826,5 +826,307 @@ KJ_TEST("Userland pipe pumpTo cancel") { ...@@ -826,5 +826,307 @@ KJ_TEST("Userland pipe pumpTo cancel") {
expectRead(*pipe2.in, "baz").wait(ws); expectRead(*pipe2.in, "baz").wait(ws);
} }
KJ_TEST("Userland pipe with limit") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe(6);
{
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
promise.wait(ws);
}
{
auto promise = pipe.in->readAllText();
KJ_EXPECT(!promise.poll(ws));
auto promise2 = pipe.out->write("barbaz", 6);
KJ_EXPECT(promise.wait(ws) == "bar");
KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
}
// Further writes throw and reads return EOF.
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
}
KJ_TEST("Userland pipe pumpTo with limit") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe(6);
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
{
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
}
{
auto promise = expectRead(*pipe2.in, "bar");
KJ_EXPECT(!promise.poll(ws));
auto promise2 = pipe.out->write("barbaz", 6);
promise.wait(ws);
pumpPromise.wait(ws);
KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
}
// Further writes throw.
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
}
KJ_TEST("Userland pipe gather write") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foobar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
expectRead(*pipe.in, "bar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "fo").wait(ws);
expectRead(*pipe.in, "obar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foob").wait(ws);
expectRead(*pipe.in, "ar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write pump") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
expectRead(*pipe2.in, "bar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "fo").wait(ws);
expectRead(*pipe2.in, "obar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foob").wait(ws);
expectRead(*pipe2.in, "ar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write split pump on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3)
.then([&](uint64_t i) {
KJ_EXPECT(i == 3);
return pipe.in->pumpTo(*pipe2.out, 3);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe gather write split pump split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2)
.then([&](uint64_t i) {
KJ_EXPECT(i == 2);
return pipe.in->pumpTo(*pipe2.out, 4);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 4);
}
KJ_TEST("Userland pipe gather write split pump split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4)
.then([&](uint64_t i) {
KJ_EXPECT(i == 4);
return pipe.in->pumpTo(*pipe2.out, 2);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 2);
}
KJ_TEST("Userland pipe pumpTo less than write amount") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
auto pieces = kj::heapArray<ArrayPtr<const byte>>(2);
byte a[1] = { 'a' };
byte b[1] = { 'b' };
pieces[0] = arrayPtr(a, 1);
pieces[1] = arrayPtr(b, 1);
auto writePromise = pipe.out->write(pieces);
KJ_EXPECT(!writePromise.poll(ws));
expectRead(*pipe2.in, "a").wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 1);
KJ_EXPECT(!writePromise.poll(ws));
pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
expectRead(*pipe2.in, "b").wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 1);
writePromise.wait(ws);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -810,7 +810,7 @@ private: ...@@ -810,7 +810,7 @@ private:
})); }));
} }
auto remainder = pieces.slice(i + 1, pieces.size()); auto remainder = pieces.slice(i, pieces.size());
if (remainder.size() > 0) { if (remainder.size() > 0) {
auto& pipeRef = pipe; auto& pipeRef = pipe;
promise = promise.then([&pipeRef,remainder]() { promise = promise.then([&pipeRef,remainder]() {
...@@ -826,7 +826,7 @@ private: ...@@ -826,7 +826,7 @@ private:
} }
// Turns out we can forward this whole write. // Turns out we can forward this whole write.
KJ_ASSERT(pumpedSoFar + size == amount); KJ_ASSERT(size <= amount - pumpedSoFar);
return canceler.wrap(output.write(pieces).then([this,size]() { return canceler.wrap(output.write(pieces).then([this,size]() {
pumpedSoFar += size; pumpedSoFar += size;
KJ_ASSERT(pumpedSoFar <= amount); KJ_ASSERT(pumpedSoFar <= amount);
...@@ -949,7 +949,7 @@ private: ...@@ -949,7 +949,7 @@ private:
class PipeReadEnd final: public AsyncInputStream { class PipeReadEnd final: public AsyncInputStream {
public: public:
PipeReadEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {} PipeReadEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeReadEnd() { ~PipeReadEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() { unwind.catchExceptionsIfUnwinding([&]() {
pipe->abortRead(); pipe->abortRead();
}); });
...@@ -971,7 +971,7 @@ private: ...@@ -971,7 +971,7 @@ private:
class PipeWriteEnd final: public AsyncOutputStream { class PipeWriteEnd final: public AsyncOutputStream {
public: public:
PipeWriteEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {} PipeWriteEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeWriteEnd() { ~PipeWriteEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() { unwind.catchExceptionsIfUnwinding([&]() {
pipe->shutdownWrite(); pipe->shutdownWrite();
}); });
...@@ -999,7 +999,7 @@ class TwoWayPipeEnd final: public AsyncIoStream { ...@@ -999,7 +999,7 @@ class TwoWayPipeEnd final: public AsyncIoStream {
public: public:
TwoWayPipeEnd(kj::Own<AsyncPipe> in, kj::Own<AsyncPipe> out) TwoWayPipeEnd(kj::Own<AsyncPipe> in, kj::Own<AsyncPipe> out)
: in(kj::mv(in)), out(kj::mv(out)) {} : in(kj::mv(in)), out(kj::mv(out)) {}
~TwoWayPipeEnd() { ~TwoWayPipeEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() { unwind.catchExceptionsIfUnwinding([&]() {
out->shutdownWrite(); out->shutdownWrite();
in->abortRead(); in->abortRead();
...@@ -1060,9 +1060,10 @@ public: ...@@ -1060,9 +1060,10 @@ public:
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override { Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
if (limit == 0) return uint64_t(0); if (limit == 0) return uint64_t(0);
return inner->pumpTo(output, kj::min(amount, limit)) auto requested = kj::min(amount, limit);
.then([this,amount](uint64_t actual) { return inner->pumpTo(output, requested)
decreaseLimit(actual, amount); .then([this,requested](uint64_t actual) {
decreaseLimit(actual, requested);
return actual; return actual;
}); });
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment