Commit 7117175d authored by Kenton Varda's avatar Kenton Varda

Implement in-process byte stream pipes.

This lets you construct an AsyncInputStream / AsyncOutputStream pair that operates entirely within userspace, rather than pushing through a kernel-level pipe. This is far more efficient, avoiding system calls and reducing copies.

The pipe does not buffer at all. Instead, it waits for both a read() and a write() call to be active at the same time, and then it fulfills one with the other.

This implementation also optimizes pumps. Imagine the situation: you create a pipe; you call pumpTo() on the write end to pump it to some other; then you write to the write end of the pipe. In this case, the write will *directly* call the target stream to which the pipe is being pumped. Hence, adding daisy-chained pipes on top of a final output stream does not incur additional copies of the data. Similarly, tryPumpFrom() is optimized on the read end.
parent 0df23c4f
......@@ -661,5 +661,170 @@ KJ_TEST("Network::restrictPeers()") {
KJ_EXPECT(conn->readAllText().wait(w) == "");
}
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<char>(expected.size());
auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
}
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount));
}));
}
KJ_TEST("Userland pipe") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
char buf[4];
KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3);
buf[3] = '\0';
KJ_EXPECT(buf == "foo"_kj);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe cancel write") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto promise = pipe.out->write("foobar", 6);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
KJ_EXPECT(!promise.poll(ws));
promise = nullptr;
promise = pipe.out->write("baz", 3);
expectRead(*pipe.in, "baz").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
}
KJ_TEST("Userland pipe cancel read") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto writeOp = pipe.out->write("foo", 3);
auto readOp = expectRead(*pipe.in, "foobar");
writeOp.wait(ws);
KJ_EXPECT(!readOp.poll(ws));
readOp = nullptr;
auto writeOp2 = pipe.out->write("baz", 3);
expectRead(*pipe.in, "baz").wait(ws);
}
KJ_TEST("Userland pipe pumpTo") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
auto promise2 = pipe2.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe tryPumpFrom") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
auto promise2 = pipe2.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(!promise2.poll(ws));
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe pumpTo cancel") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
auto promise = pipe.out->write("foobar", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
// Cancel pump.
pumpPromise = nullptr;
auto promise3 = pipe2.out->write("baz", 3);
expectRead(*pipe2.in, "baz").wait(ws);
}
KJ_TEST("Userland pipe pumpTo cancel") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foobar", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
// Cancel pump.
pumpPromise = nullptr;
auto promise3 = pipe2.out->write("baz", 3);
expectRead(*pipe2.in, "baz").wait(ws);
}
} // namespace
} // namespace kj
......@@ -30,6 +30,7 @@
#include "debug.h"
#include "vector.h"
#include "io.h"
#include "one-of.h"
#if _WIN32
#include <winsock2.h>
......@@ -185,6 +186,922 @@ Maybe<Promise<uint64_t>> AsyncOutputStream::tryPumpFrom(
return nullptr;
}
namespace {
class AsyncPipe final: public AsyncIoStream, public Refcounted {
public:
~AsyncPipe() noexcept(false) {
KJ_REQUIRE(state == nullptr || ownState.get() != nullptr,
"destroying AsyncPipe with operation still in-progress; probably going to segfault") {
// Don't std::terminate().
break;
}
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (minBytes == 0) {
return size_t(0);
} else KJ_IF_MAYBE(s, state) {
return s->tryRead(buffer, minBytes, maxBytes);
} else {
return newAdaptedPromise<size_t, BlockedRead>(
*this, arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes), minBytes);
}
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
if (amount == 0) {
return uint64_t(0);
} else KJ_IF_MAYBE(s, state) {
return s->pumpTo(output, amount);
} else {
return newAdaptedPromise<uint64_t, BlockedPumpTo>(*this, output, amount);
}
}
void abortRead() override {
KJ_IF_MAYBE(s, state) {
s->abortRead();
} else {
ownState = kj::heap<AbortedRead>();
state = *ownState;
}
}
Promise<void> write(const void* buffer, size_t size) override {
if (size == 0) {
return READY_NOW;
} else KJ_IF_MAYBE(s, state) {
return s->write(buffer, size);
} else {
return newAdaptedPromise<void, BlockedWrite>(
*this, arrayPtr(reinterpret_cast<const byte*>(buffer), size), nullptr);
}
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
while (pieces.size() > 0 && pieces[0].size() == 0) {
pieces = pieces.slice(1, pieces.size());
}
if (pieces.size() == 0) {
return kj::READY_NOW;
} else KJ_IF_MAYBE(s, state) {
return s->write(pieces);
} else {
return newAdaptedPromise<void, BlockedWrite>(
*this, pieces[0], pieces.slice(1, pieces.size()));
}
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
if (amount == 0) {
return Promise<uint64_t>(uint64_t(0));
} else KJ_IF_MAYBE(s, state) {
return s->tryPumpFrom(input, amount);
} else {
return newAdaptedPromise<uint64_t, BlockedPumpFrom>(*this, input, amount);
}
}
void shutdownWrite() override {
KJ_IF_MAYBE(s, state) {
s->shutdownWrite();
} else {
ownState = kj::heap<ShutdownedWrite>();
state = *ownState;
}
}
private:
Maybe<AsyncIoStream&> state;
// Object-oriented state! If any method call is blocked waiting on activity from the other end,
// then `state` is non-null and method calls should be forwarded to it. If no calls are
// outstanding, `state` is null.
kj::Own<AsyncIoStream> ownState;
void endState(AsyncIoStream& obj) {
KJ_IF_MAYBE(s, state) {
if (s == &obj) {
state = nullptr;
}
}
}
class BlockedWrite final: public AsyncIoStream {
// AsyncPipe state when a write() is currently waiting for a corresponding read().
public:
BlockedWrite(PromiseFulfiller<void>& fulfiller, AsyncPipe& pipe,
ArrayPtr<const byte> writeBuffer,
ArrayPtr<const ArrayPtr<const byte>> morePieces)
: fulfiller(fulfiller), pipe(pipe), writeBuffer(writeBuffer), morePieces(morePieces) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedWrite() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto readBuffer = arrayPtr(reinterpret_cast<byte*>(readBufferPtr), maxBytes);
size_t totalRead = 0;
while (readBuffer.size() >= writeBuffer.size()) {
// The whole current write buffer can be copied into the read buffer.
{
auto n = writeBuffer.size();
memcpy(readBuffer.begin(), writeBuffer.begin(), n);
totalRead += n;
readBuffer = readBuffer.slice(n, readBuffer.size());
}
if (morePieces.size() == 0) {
// All done writing.
fulfiller.fulfill();
pipe.endState(*this);
if (totalRead >= minBytes) {
// Also all done reading.
return totalRead;
} else {
return pipe.tryRead(readBuffer.begin(), minBytes - totalRead, readBuffer.size())
.then([totalRead](size_t amount) { return amount + totalRead; });
}
}
writeBuffer = morePieces[0];
morePieces = morePieces.slice(1, morePieces.size());
}
// At this point, the read buffer is smaller than the current write buffer, so we can fill
// it completely.
{
auto n = readBuffer.size();
memcpy(readBuffer.begin(), writeBuffer.begin(), n);
writeBuffer = writeBuffer.slice(n, writeBuffer.size());
totalRead += n;
}
return totalRead;
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
if (amount < writeBuffer.size()) {
// Consume a portion of the write buffer.
return canceler.wrap(output.write(writeBuffer.begin(), amount)
.then([this,amount]() {
writeBuffer = writeBuffer.slice(amount, writeBuffer.size());
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
// First piece doesn't cover the whole pump. Figure out how many more pieces to add.
uint64_t actual = writeBuffer.size();
size_t i = 0;
while (i < morePieces.size() &&
amount >= actual + morePieces[i].size()) {
actual += morePieces[i++].size();
}
// Write the first piece.
auto promise = output.write(writeBuffer.begin(), writeBuffer.size());
// Write full pieces as a singcle gather-write.
if (i > 0) {
auto more = morePieces.slice(0, i);
promise = promise.then([&output,more]() { return output.write(more); });
}
if (i == morePieces.size()) {
// This will complete the write.
return canceler.wrap(promise.then([this,&output,amount,actual]() -> Promise<uint64_t> {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
if (actual == amount) {
// Oh, we had exactly enough.
return actual;
} else {
return pipe.pumpTo(output, amount - actual)
.then([actual](uint64_t actual2) { return actual + actual2; });
}
}));
} else {
// Pump ends mid-piece. Write the last, partial piece.
auto n = amount - actual;
auto splitPiece = morePieces[i];
KJ_ASSERT(n <= splitPiece.size());
auto newWriteBuffer = splitPiece.slice(n, splitPiece.size());
auto newMorePieces = morePieces.slice(i + 1, morePieces.size());
auto prefix = splitPiece.slice(0, n);
if (prefix.size() > 0) {
promise = promise.then([&output,prefix]() {
return output.write(prefix.begin(), prefix.size());
});
}
return canceler.wrap(promise.then([this,newWriteBuffer,newMorePieces,amount]() {
writeBuffer = newWriteBuffer;
morePieces = newMorePieces;
canceler.release();
return amount;
}));
}
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("can't write() again until previous write() completes");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("can't write() again until previous write() completes");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() again until previous write() completes");
}
void shutdownWrite() override {
KJ_FAIL_REQUIRE("can't shutdownWrite() until previous write() completes");
}
private:
PromiseFulfiller<void>& fulfiller;
AsyncPipe& pipe;
ArrayPtr<const byte> writeBuffer;
ArrayPtr<const ArrayPtr<const byte>> morePieces;
Canceler canceler;
};
class BlockedPumpFrom final: public AsyncIoStream {
// AsyncPipe state when a tryPumpFrom() is currently waiting for a corresponding read().
public:
BlockedPumpFrom(PromiseFulfiller<uint64_t>& fulfiller, AsyncPipe& pipe,
AsyncInputStream& input, uint64_t amount)
: fulfiller(fulfiller), pipe(pipe), input(input), amount(amount) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpFrom() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto pumpLeft = amount - pumpedSoFar;
auto min = kj::min(pumpLeft, minBytes);
auto max = kj::min(pumpLeft, maxBytes);
return canceler.wrap(input.tryRead(readBuffer, min, max)
.then([this,readBuffer,minBytes,maxBytes,min](size_t actual) -> kj::Promise<size_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount || actual < min) {
// Either we pumped all we wanted or we hit EOF.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
}
if (actual >= minBytes) {
return actual;
} else {
return pipe.read(reinterpret_cast<byte*>(readBuffer) + actual,
minBytes - actual, maxBytes - actual)
.then([actual](size_t actual2) { return actual + actual2; });
}
}));
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount2) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto n = kj::min(amount2, amount - pumpedSoFar);
return canceler.wrap(input.pumpTo(output, n)
.then([this,&output,amount2,n](uint64_t actual) -> Promise<uint64_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
KJ_ASSERT(actual <= amount2);
if (actual == amount2) {
// Completed entire pumpTo amount.
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return pipe.pumpTo(output, amount2 - actual);
}
}));
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("can't write() again until previous tryPumpFrom() completes");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("can't write() again until previous tryPumpFrom() completes");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() again until previous tryPumpFrom() completes");
}
void shutdownWrite() override {
KJ_FAIL_REQUIRE("can't shutdownWrite() until previous tryPumpFrom() completes");
}
private:
PromiseFulfiller<uint64_t>& fulfiller;
AsyncPipe& pipe;
AsyncInputStream& input;
uint64_t amount;
uint64_t pumpedSoFar = 0;
Canceler canceler;
};
class BlockedRead final: public AsyncIoStream {
// AsyncPipe state when a tryRead() is currently waiting for a corresponding write().
public:
BlockedRead(PromiseFulfiller<size_t>& fulfiller, AsyncPipe& pipe,
ArrayPtr<byte> readBuffer, size_t minBytes)
: fulfiller(fulfiller), pipe(pipe), readBuffer(readBuffer), minBytes(minBytes) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedRead() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("can't read() again until previous read() completes");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't read() again until previous read() completes");
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* writeBuffer, size_t size) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
if (size < readBuffer.size()) {
// Consume a portion of the read buffer.
memcpy(readBuffer.begin(), writeBuffer, size);
readSoFar += size;
readBuffer = readBuffer.slice(size, readBuffer.size());
if (readSoFar >= minBytes) {
// We've read enough to close out this read.
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
}
return READY_NOW;
} else {
// Consume entire read buffer.
auto n = readBuffer.size();
fulfiller.fulfill(readSoFar + n);
pipe.endState(*this);
memcpy(readBuffer.begin(), writeBuffer, n);
if (n == size) {
// That's it.
return READY_NOW;
} else {
return pipe.write(reinterpret_cast<const byte*>(writeBuffer) + n, size - n);
}
}
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
while (pieces.size() > 0) {
if (pieces[0].size() < readBuffer.size()) {
// Consume a portion of the read buffer.
auto n = pieces[0].size();
memcpy(readBuffer.begin(), pieces[0].begin(), n);
readSoFar += n;
readBuffer = readBuffer.slice(n, readBuffer.size());
pieces = pieces.slice(1, pieces.size());
// loop
} else {
// Consume entire read buffer.
auto n = readBuffer.size();
fulfiller.fulfill(readSoFar + n);
pipe.endState(*this);
memcpy(readBuffer.begin(), pieces[0].begin(), n);
auto restOfPiece = pieces[0].slice(n, pieces[0].size());
pieces = pieces.slice(1, pieces.size());
if (restOfPiece.size() == 0) {
// We exactly finished the current piece, so just issue a write for the remaining
// pieces.
if (pieces.size() == 0) {
// Nothing left.
return READY_NOW;
} else {
// Write remaining pieces.
return pipe.write(pieces);
}
} else {
// Unfortunately we have to execute a separate write() for the remaining part of this
// piece, because we can't modify the pieces array.
auto promise = pipe.write(restOfPiece.begin(), restOfPiece.size());
if (pieces.size() > 0) {
// No more pieces so that's it.
return kj::mv(promise);
} else {
// Also need to write the remaining pieces.
auto& pipeRef = pipe;
return promise.then([pieces,&pipeRef]() {
return pipeRef.write(pieces);
});
}
}
}
}
// Consumed all written pieces.
if (readSoFar >= minBytes) {
// We've read enough to close out this read.
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
}
return READY_NOW;
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
KJ_ASSERT(minBytes > readSoFar);
auto minToRead = kj::min(amount, minBytes - readSoFar);
auto maxToRead = kj::min(amount, readBuffer.size());
return canceler.wrap(input.tryRead(readBuffer.begin(), minToRead, maxToRead)
.then([this,&input,amount,minToRead](size_t actual) -> Promise<uint64_t> {
readBuffer = readBuffer.slice(actual, readBuffer.size());
readSoFar += actual;
if (readSoFar >= minBytes || actual < minToRead) {
// We've read enough to close out this read (readSoFar >= minBytes)
// OR we reached EOF and couldn't complete the read (actual < minToRead)
// Either way, we want to close out this read.
canceler.release();
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
if (actual < amount) {
// We din't complete pumping. Restart from the pipe.
return input.pumpTo(pipe, amount - actual)
.then([actual](size_t actual2) { return actual + actual2; });
}
}
// If we read less than `actual`, but more than `minToRead`, it can only have been
// because we reached `minBytes`, so the conditional above would have executed. So, here
// we know that actual == amount.
KJ_ASSERT(actual == amount);
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
void shutdownWrite() override {
canceler.cancel("shutdownWrite() was called");
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
pipe.shutdownWrite();
}
private:
PromiseFulfiller<size_t>& fulfiller;
AsyncPipe& pipe;
ArrayPtr<byte> readBuffer;
size_t minBytes;
size_t readSoFar = 0;
Canceler canceler;
};
class BlockedPumpTo final: public AsyncIoStream {
// AsyncPipe state when a pumpTo() is currently waiting for a corresponding write().
public:
BlockedPumpTo(PromiseFulfiller<size_t>& fulfiller, AsyncPipe& pipe,
AsyncOutputStream& output, uint64_t amount)
: fulfiller(fulfiller), pipe(pipe), output(output), amount(amount) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpTo() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("can't read() again until previous pumpTo() completes");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't read() again until previous pumpTo() completes");
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* writeBuffer, size_t size) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto actual = kj::min(amount - pumpedSoFar, size);
return canceler.wrap(output.write(writeBuffer, actual)
.then([this,size,actual,writeBuffer]() -> kj::Promise<void> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
KJ_ASSERT(actual <= size);
if (pumpedSoFar == amount) {
// Done with pump.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
}
if (actual == size) {
return kj::READY_NOW;
} else {
KJ_ASSERT(pumpedSoFar == amount);
return pipe.write(reinterpret_cast<const byte*>(writeBuffer) + actual, size - actual);
}
}));
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
size_t size = 0;
size_t needed = amount - pumpedSoFar;
for (auto i: kj::indices(pieces)) {
if (pieces[i].size() > needed) {
// The pump ends in the middle of this write.
auto promise = output.write(pieces.slice(0, i));
if (needed > 0) {
// The pump includes part of this piece, but not all. Unfortunately we need to split
// writes.
auto partial = pieces[i].slice(0, needed);
promise = promise.then([this,partial]() {
return output.write(partial.begin(), partial.size());
});
auto partial2 = pieces[i].slice(needed, pieces[i].size());
promise = canceler.wrap(promise.then([this,partial2]() {
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
return pipe.write(partial2.begin(), partial2.size());
}));
++i;
} else {
// The pump ends exactly at the end of a piece, how nice.
promise = canceler.wrap(promise.then([this]() {
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}));
}
auto remainder = pieces.slice(i + 1, pieces.size());
if (remainder.size() > 0) {
auto& pipeRef = pipe;
promise = promise.then([&pipeRef,remainder]() {
return pipeRef.write(remainder);
});
}
return promise;
} else {
size += pieces[i].size();
needed -= pieces[i].size();
}
}
// Turns out we can forward this whole write.
KJ_ASSERT(pumpedSoFar + size == amount);
return canceler.wrap(output.write(pieces).then([this,size]() {
pumpedSoFar += size;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
// Done pumping.
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
}));
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount2) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto n = kj::min(amount2, amount - pumpedSoFar);
return output.tryPumpFrom(input, n)
.map([&](Promise<uint64_t> subPump) {
return canceler.wrap(subPump
.then([this,&input,amount2,n](uint64_t actual) -> Promise<uint64_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
KJ_ASSERT(actual <= amount2);
if (actual == amount2) {
// Completed entire tryPumpFrom amount.
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return input.pumpTo(pipe, amount2 - actual);
}
}));
});
}
void shutdownWrite() override {
canceler.cancel("shutdownWrite() was called");
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
pipe.shutdownWrite();
}
private:
PromiseFulfiller<size_t>& fulfiller;
AsyncPipe& pipe;
AsyncOutputStream& output;
uint64_t amount;
size_t pumpedSoFar = 0;
Canceler canceler;
};
class AbortedRead final: public AsyncIoStream {
// AsyncPipe state when abortRead() has been called.
public:
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
void abortRead() override {
// ignore repeated abort
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
void shutdownWrite() override {
// ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped,
// which is not an error even if reads have been aborted.
}
};
class ShutdownedWrite final: public AsyncIoStream {
// AsyncPipe state when shutdownWrite() has been called.
public:
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return uint64_t(0);
}
void abortRead() override {
// ignore
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
void shutdownWrite() override {
// ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped,
// so it will only be called once anyhow.
}
};
};
class PipeReadEnd final: public AsyncInputStream {
public:
PipeReadEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeReadEnd() {
unwind.catchExceptionsIfUnwinding([&]() {
pipe->abortRead();
});
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return pipe->tryRead(buffer, minBytes, maxBytes);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return pipe->pumpTo(output, amount);
}
private:
Own<AsyncPipe> pipe;
UnwindDetector unwind;
};
class PipeWriteEnd final: public AsyncOutputStream {
public:
PipeWriteEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeWriteEnd() {
unwind.catchExceptionsIfUnwinding([&]() {
pipe->shutdownWrite();
});
}
Promise<void> write(const void* buffer, size_t size) override {
return pipe->write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return pipe->write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
return pipe->tryPumpFrom(input, amount);
}
private:
Own<AsyncPipe> pipe;
UnwindDetector unwind;
};
class TwoWayPipeEnd final: public AsyncIoStream {
public:
TwoWayPipeEnd(kj::Own<AsyncPipe> in, kj::Own<AsyncPipe> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
~TwoWayPipeEnd() {
unwind.catchExceptionsIfUnwinding([&]() {
out->shutdownWrite();
in->abortRead();
});
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return in->tryRead(buffer, minBytes, maxBytes);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return in->pumpTo(output, amount);
}
void abortRead() override {
in->abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
return out->write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return out->write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
return out->tryPumpFrom(input, amount);
}
void shutdownWrite() override {
out->shutdownWrite();
}
private:
kj::Own<AsyncPipe> in;
kj::Own<AsyncPipe> out;
UnwindDetector unwind;
};
class LimitedInputStream final: public AsyncInputStream {
public:
LimitedInputStream(kj::Own<AsyncInputStream> inner, uint64_t limit)
: inner(kj::mv(inner)), limit(limit) {
if (limit == 0) {
inner = nullptr;
}
}
Maybe<uint64_t> tryGetLength() override {
return limit;
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (limit == 0) return size_t(0);
return inner->tryRead(buffer, kj::min(minBytes, limit), kj::min(maxBytes, limit))
.then([this,minBytes](size_t actual) {
decreaseLimit(actual, minBytes);
return actual;
});
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
if (limit == 0) return uint64_t(0);
return inner->pumpTo(output, kj::min(amount, limit))
.then([this,amount](uint64_t actual) {
decreaseLimit(actual, amount);
return actual;
});
}
private:
Own<AsyncInputStream> inner;
uint64_t limit;
void decreaseLimit(uint64_t amount, uint64_t requested) {
KJ_ASSERT(limit >= amount);
limit -= amount;
if (limit == 0) {
inner = nullptr;
} else if (amount < requested) {
KJ_FAIL_REQUIRE("pipe ended prematurely");
}
}
};
} // namespace
OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength) {
auto impl = kj::refcounted<AsyncPipe>();
Own<AsyncInputStream> readEnd = kj::heap<PipeReadEnd>(kj::addRef(*impl));
KJ_IF_MAYBE(l, expectedLength) {
readEnd = kj::heap<LimitedInputStream>(kj::mv(readEnd), *l);
}
Own<AsyncOutputStream> writeEnd = kj::heap<PipeWriteEnd>(kj::mv(impl));
return { kj::mv(readEnd), kj::mv(writeEnd) };
}
TwoWayPipe newTwoWayPipe() {
auto pipe1 = kj::refcounted<AsyncPipe>();
auto pipe2 = kj::refcounted<AsyncPipe>();
auto end1 = kj::heap<TwoWayPipeEnd>(kj::addRef(*pipe1), kj::addRef(*pipe2));
auto end2 = kj::heap<TwoWayPipeEnd>(kj::mv(pipe2), kj::mv(pipe1));
return { { kj::mv(end1), kj::mv(end2) } };
}
Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() {
return tryReceiveStream()
.then([](Maybe<Own<AsyncCapabilityStream>>&& result)
......
......@@ -175,6 +175,13 @@ struct OneWayPipe {
Own<AsyncOutputStream> out;
};
OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr);
// Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits
// until both a read() and a write() call are pending, then resolves both.
//
// If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many
// bytes. The input end's `tryGetLength()` will return the number of bytes left.
struct TwoWayPipe {
// A data pipe that supports sending in both directions. Each end's output sends data to the
// other end's input. (Typically backed by socketpair() system call.)
......@@ -182,6 +189,10 @@ struct TwoWayPipe {
Own<AsyncIoStream> ends[2];
};
TwoWayPipe newTwoWayPipe();
// Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits
// until both a read() and a write() call are pending, then resolves both.
struct CapabilityPipe {
// Like TwoWayPipe but allowing capability-passing.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment