Unverified Commit 46ed8793 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #741 from capnproto/harris/tee

Implement kj::newTee()
parents 147c0e54 c281088c
...@@ -24,10 +24,10 @@ matrix: ...@@ -24,10 +24,10 @@ matrix:
sources: sources:
- ubuntu-toolchain-r-test - ubuntu-toolchain-r-test
packages: packages:
- g++-4.9 - g++-5
env: env:
- MATRIX_CC=gcc-4.9 - MATRIX_CC=gcc-5
- MATRIX_CXX=g++-4.9 - MATRIX_CXX=g++-5
# New GCC # New GCC
- os: linux - os: linux
......
...@@ -447,6 +447,7 @@ TEST(Array, AttachNested) { ...@@ -447,6 +447,7 @@ TEST(Array, AttachNested) {
Array<Own<DestructionOrderRecorder>> combined = arr.attach(kj::mv(obj2)).attach(kj::mv(obj3)); Array<Own<DestructionOrderRecorder>> combined = arr.attach(kj::mv(obj2)).attach(kj::mv(obj3));
KJ_EXPECT(combined.begin() == ptr); KJ_EXPECT(combined.begin() == ptr);
KJ_EXPECT(combined.size() == 1);
KJ_EXPECT(obj1.get() == nullptr); KJ_EXPECT(obj1.get() == nullptr);
KJ_EXPECT(obj2.get() == nullptr); KJ_EXPECT(obj2.get() == nullptr);
......
...@@ -862,6 +862,7 @@ template <typename T> ...@@ -862,6 +862,7 @@ template <typename T>
template <typename... Attachments> template <typename... Attachments>
Array<T> Array<T>::attach(Attachments&&... attachments) { Array<T> Array<T>::attach(Attachments&&... attachments) {
T* ptrCopy = ptr; T* ptrCopy = ptr;
auto sizeCopy = size_;
KJ_IREQUIRE(ptrCopy != nullptr, "cannot attach to null pointer"); KJ_IREQUIRE(ptrCopy != nullptr, "cannot attach to null pointer");
...@@ -872,7 +873,7 @@ Array<T> Array<T>::attach(Attachments&&... attachments) { ...@@ -872,7 +873,7 @@ Array<T> Array<T>::attach(Attachments&&... attachments) {
auto bundle = new _::ArrayDisposableOwnedBundle<Array<T>, Attachments...>( auto bundle = new _::ArrayDisposableOwnedBundle<Array<T>, Attachments...>(
kj::mv(*this), kj::fwd<Attachments>(attachments)...); kj::mv(*this), kj::fwd<Attachments>(attachments)...);
return Array<T>(ptrCopy, size_, *bundle); return Array<T>(ptrCopy, sizeCopy, *bundle);
} }
template <typename T> template <typename T>
......
...@@ -681,7 +681,7 @@ kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { ...@@ -681,7 +681,7 @@ kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
})); }));
} }
class MockAsyncInputStream: public AsyncInputStream { class MockAsyncInputStream final: public AsyncInputStream {
public: public:
MockAsyncInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize) MockAsyncInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize)
: bytes(bytes), blockSize(blockSize) {} : bytes(bytes), blockSize(blockSize) {}
...@@ -1375,5 +1375,733 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") { ...@@ -1375,5 +1375,733 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
pipe2.out = nullptr; pipe2.out = nullptr;
} }
constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
KJ_TEST("Userland tee") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto tee = newTee(kj::mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto writePromise = pipe.out->write("foobar", 6);
expectRead(*left, "foobar").wait(ws);
writePromise.wait(ws);
expectRead(*right, "foobar").wait(ws);
}
KJ_TEST("Userland tee concurrent read") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto tee = newTee(kj::mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
uint8_t leftBuf[6] = { 0 };
uint8_t rightBuf[6] = { 0 };
auto leftPromise = left->tryRead(leftBuf, 6, 6);
auto rightPromise = right->tryRead(rightBuf, 6, 6);
KJ_EXPECT(!leftPromise.poll(ws));
KJ_EXPECT(!rightPromise.poll(ws));
pipe.out->write("foobar", 6).wait(ws);
KJ_EXPECT(leftPromise.wait(ws) == 6);
KJ_EXPECT(rightPromise.wait(ws) == 6);
KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
}
KJ_TEST("Userland tee cancel and restart read") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto tee = newTee(kj::mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto writePromise = pipe.out->write("foobar", 6);
{
// Initiate a read and immediately cancel it.
uint8_t buf[6] = { 0 };
auto promise = left->tryRead(buf, 6, 6);
}
// Subsequent reads still see the full data.
expectRead(*left, "foobar").wait(ws);
writePromise.wait(ws);
expectRead(*right, "foobar").wait(ws);
}
KJ_TEST("Userland tee cancel read and destroy branch then read other branch") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto tee = newTee(kj::mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto writePromise = pipe.out->write("foobar", 6);
{
// Initiate a read and immediately cancel it.
uint8_t buf[6] = { 0 };
auto promise = left->tryRead(buf, 6, 6);
}
// And destroy the branch for good measure.
left = nullptr;
// Subsequent reads on the other branch still see the full data.
expectRead(*right, "foobar").wait(ws);
writePromise.wait(ws);
}
KJ_TEST("Userland tee subsequent other-branch reads are READY_NOW") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto tee = newTee(kj::mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
uint8_t leftBuf[6] = { 0 };
auto leftPromise = left->tryRead(leftBuf, 6, 6);
// This is the first read, so there should NOT be buffered data.
KJ_EXPECT(!leftPromise.poll(ws));
pipe.out->write("foobar", 6).wait(ws);
leftPromise.wait(ws);
KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
uint8_t rightBuf[6] = { 0 };
auto rightPromise = right->tryRead(rightBuf, 6, 6);
// The left read promise was fulfilled, so there SHOULD be buffered data.
KJ_EXPECT(rightPromise.poll(ws));
rightPromise.wait(ws);
KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
}
KJ_TEST("Userland tee read EOF propagation") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto writePromise = pipe.out->write("foobar", 6);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
// Lengthless pipe, so ...
KJ_EXPECT(left->tryGetLength() == nullptr);
KJ_EXPECT(right->tryGetLength() == nullptr);
uint8_t leftBuf[7] = { 0 };
auto leftPromise = left->tryRead(leftBuf, size(leftBuf), size(leftBuf));
writePromise.wait(ws);
// Destroying the output side should force a short read.
pipe.out = nullptr;
KJ_EXPECT(leftPromise.wait(ws) == 6);
KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
// And we should see a short read here, too.
uint8_t rightBuf[7] = { 0 };
auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf));
KJ_EXPECT(rightPromise.wait(ws) == 6);
KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
// Further reads should all be short.
KJ_EXPECT(left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws) == 0);
KJ_EXPECT(right->tryRead(rightBuf, 1, size(rightBuf)).wait(ws) == 0);
}
KJ_TEST("Userland tee read exception propagation") {
kj::EventLoop loop;
WaitScope ws(loop);
// Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
// ended prematurely" exception when we destroy the output side early.
auto pipe = newOneWayPipe(7);
auto writePromise = pipe.out->write("foobar", 6);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
// Test tryGetLength() while we're at it.
KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 7);
KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7);
uint8_t leftBuf[7] = { 0 };
auto leftPromise = left->tryRead(leftBuf, 6, size(leftBuf));
writePromise.wait(ws);
// Destroying the output side should force a fulfillment of the read (since we reached minBytes).
pipe.out = nullptr;
KJ_EXPECT(leftPromise.wait(ws) == 6);
KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0);
// The next read sees the exception.
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws));
// Test tryGetLength() here -- the unread branch still sees the original length value.
KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 1);
KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7);
// We should see the buffered data on the other side, even though we don't reach our minBytes.
uint8_t rightBuf[7] = { 0 };
auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf));
KJ_EXPECT(rightPromise.wait(ws) == 6);
KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
right->tryRead(rightBuf, 1, size(leftBuf)).wait(ws));
// Further reads should all see the exception again.
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
right->tryRead(rightBuf, 1, size(leftBuf)).wait(ws));
}
KJ_TEST("Userland tee read exception propagation w/ data loss") {
kj::EventLoop loop;
WaitScope ws(loop);
// Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
// ended prematurely" exception once the pipe sees a short read.
auto pipe = newOneWayPipe(7);
auto writePromise = pipe.out->write("foobar", 6);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
uint8_t leftBuf[7] = { 0 };
auto leftPromise = left->tryRead(leftBuf, 7, 7);
writePromise.wait(ws);
// Destroying the output side should force an exception, since we didn't reach our minBytes.
pipe.out = nullptr;
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPromise.wait(ws));
// And we should see a short read here, too. In fact, we shouldn't see anything: the short read
// above read all of the pipe's data, but then failed to buffer it because it encountered an
// exception. It buffered the exception, instead.
uint8_t rightBuf[7] = { 0 };
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely",
right->tryRead(rightBuf, 1, 1).wait(ws));
}
KJ_TEST("Userland tee read into different buffer sizes") {
kj::EventLoop loop;
WaitScope ws(loop);
auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
uint8_t leftBuf[5] = { 0 };
uint8_t rightBuf[11] = { 0 };
auto leftPromise = left->tryRead(leftBuf, 5, 5);
auto rightPromise = right->tryRead(rightBuf, 11, 11);
KJ_EXPECT(leftPromise.wait(ws) == 5);
KJ_EXPECT(rightPromise.wait(ws) == 11);
}
KJ_TEST("Userland tee reads see max(minBytes...) and min(maxBytes...)") {
kj::EventLoop loop;
WaitScope ws(loop);
auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
{
uint8_t leftBuf[5] = { 0 };
uint8_t rightBuf[11] = { 0 };
// Subrange of another range. The smaller maxBytes should win.
auto leftPromise = left->tryRead(leftBuf, 3, 5);
auto rightPromise = right->tryRead(rightBuf, 1, 11);
KJ_EXPECT(leftPromise.wait(ws) == 5);
KJ_EXPECT(rightPromise.wait(ws) == 5);
}
{
uint8_t leftBuf[5] = { 0 };
uint8_t rightBuf[11] = { 0 };
// Disjoint ranges. The larger minBytes should win.
auto leftPromise = left->tryRead(leftBuf, 3, 5);
auto rightPromise = right->tryRead(rightBuf, 6, 11);
KJ_EXPECT(leftPromise.wait(ws) == 5);
KJ_EXPECT(rightPromise.wait(ws) == 6);
KJ_EXPECT(left->tryRead(leftBuf, 1, 2).wait(ws) == 1);
}
}
KJ_TEST("Userland tee read stress test") {
kj::EventLoop loop;
WaitScope ws(loop);
auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftBuffer = heapArray<byte>(bigText.size());
{
auto leftSlice = leftBuffer.slice(0, leftBuffer.size());
while (leftSlice.size() > 0) {
for (size_t blockSize: { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59 }) {
if (leftSlice.size() == 0) break;
auto maxBytes = min(blockSize, leftSlice.size());
auto amount = left->tryRead(leftSlice.begin(), 1, maxBytes).wait(ws);
leftSlice = leftSlice.slice(amount, leftSlice.size());
}
}
}
KJ_EXPECT(memcmp(leftBuffer.begin(), bigText.begin(), leftBuffer.size()) == 0);
KJ_EXPECT(right->readAllText().wait(ws) == bigText);
}
KJ_TEST("Userland tee pump") {
kj::EventLoop loop;
WaitScope ws(loop);
auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out, 7);
KJ_EXPECT(!leftPumpPromise.poll(ws));
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
// Neither are ready yet, because the left pump's backpressure has blocked the AsyncTee's pull
// loop until we read from leftPipe.
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
expectRead(*leftPipe.in, "foo bar").wait(ws);
KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
KJ_EXPECT(!rightPumpPromise.poll(ws));
// We should be able to read up to how far the left side pumped, and beyond. The left side will
// now have data in its buffer.
expectRead(*rightPipe.in, "foo bar baz,foo bar baz,foo").wait(ws);
// Consume the left side buffer.
expectRead(*left, " baz,foo bar").wait(ws);
// We can destroy the left branch entirely and the right branch will still see all data.
left = nullptr;
KJ_EXPECT(!rightPumpPromise.poll(ws));
auto allTextPromise = rightPipe.in->readAllText();
KJ_EXPECT(rightPumpPromise.wait(ws) == bigText.size());
// Need to force an EOF in the right pipe to check the result.
rightPipe.out = nullptr;
KJ_EXPECT(allTextPromise.wait(ws) == bigText.slice(27));
}
KJ_TEST("Userland tee pump slows down reads") {
kj::EventLoop loop;
WaitScope ws(loop);
auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
// The left pump will cause some data to be buffered on the right branch, which we can read.
auto rightExpectation0 = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE));
expectRead(*right, rightExpectation0).wait(ws);
// But the next right branch read is blocked by the left pipe's backpressure.
auto rightExpectation1 = kj::str(bigText.slice(TEE_MAX_CHUNK_SIZE, TEE_MAX_CHUNK_SIZE + 10));
auto rightPromise = expectRead(*right, rightExpectation1);
KJ_EXPECT(!rightPromise.poll(ws));
// The right branch read finishes when we relieve the pressure in the left pipe.
auto allTextPromise = leftPipe.in->readAllText();
rightPromise.wait(ws);
KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size());
leftPipe.out = nullptr;
KJ_EXPECT(allTextPromise.wait(ws) == bigText);
}
KJ_TEST("Userland tee pump EOF propagation") {
kj::EventLoop loop;
WaitScope ws(loop);
{
// EOF encountered by two pump operations.
auto pipe = newOneWayPipe();
auto writePromise = pipe.out->write("foo bar", 7);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
// Pump the first bit, and block.
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
writePromise.wait(ws);
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
// Induce an EOF. We should see it propagated to both pump promises.
pipe.out = nullptr;
// Relieve backpressure.
auto leftAllPromise = leftPipe.in->readAllText();
auto rightAllPromise = rightPipe.in->readAllText();
KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
KJ_EXPECT(rightPumpPromise.wait(ws) == 7);
// Make sure we got the data on the pipes that were being pumped to.
KJ_EXPECT(!leftAllPromise.poll(ws));
KJ_EXPECT(!rightAllPromise.poll(ws));
leftPipe.out = nullptr;
rightPipe.out = nullptr;
KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar");
}
{
// EOF encountered by a read and pump operation.
auto pipe = newOneWayPipe();
auto writePromise = pipe.out->write("foo bar", 7);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
// Pump one branch, read another.
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
expectRead(*right, "foo bar").wait(ws);
writePromise.wait(ws);
uint8_t dummy = 0;
auto rightReadPromise = right->tryRead(&dummy, 1, 1);
// Induce an EOF. We should see it propagated to both the read and pump promises.
pipe.out = nullptr;
// Relieve backpressure in the tee to see the EOF.
auto leftAllPromise = leftPipe.in->readAllText();
KJ_EXPECT(leftPumpPromise.wait(ws) == 7);
KJ_EXPECT(rightReadPromise.wait(ws) == 0);
// Make sure we got the data on the pipe that was being pumped to.
KJ_EXPECT(!leftAllPromise.poll(ws));
leftPipe.out = nullptr;
KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
}
}
KJ_TEST("Userland tee pump EOF on chunk boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
// Conjure an EOF right on the boundary of the tee's internal chunk.
auto chunkText = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE));
auto tee = newTee(heap<MockAsyncInputStream>(chunkText.asBytes(), chunkText.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
auto leftAllPromise = leftPipe.in->readAllText();
auto rightAllPromise = rightPipe.in->readAllText();
// The pumps should see the EOF and stop.
KJ_EXPECT(leftPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE);
KJ_EXPECT(rightPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE);
// Verify that we saw the data on the other end of the destination pipes.
leftPipe.out = nullptr;
rightPipe.out = nullptr;
KJ_EXPECT(leftAllPromise.wait(ws) == chunkText);
KJ_EXPECT(rightAllPromise.wait(ws) == chunkText);
}
KJ_TEST("Userland tee pump read exception propagation") {
kj::EventLoop loop;
WaitScope ws(loop);
{
// Exception encountered by two pump operations.
auto pipe = newOneWayPipe(14);
auto writePromise = pipe.out->write("foo bar", 7);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
// Pump the first bit, and block.
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
writePromise.wait(ws);
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
// Induce a read exception. We should see it propagated to both pump promises.
pipe.out = nullptr;
// Both promises must exist before the backpressure in the tee is relieved, and the tee pull
// loop actually sees the exception.
auto leftAllPromise = leftPipe.in->readAllText();
auto rightAllPromise = rightPipe.in->readAllText();
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPumpPromise.wait(ws));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", rightPumpPromise.wait(ws));
// Make sure we got the data on the destination pipes.
KJ_EXPECT(!leftAllPromise.poll(ws));
KJ_EXPECT(!rightAllPromise.poll(ws));
leftPipe.out = nullptr;
rightPipe.out = nullptr;
KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar");
}
{
// Exception encountered by a read and pump operation.
auto pipe = newOneWayPipe(14);
auto writePromise = pipe.out->write("foo bar", 7);
auto tee = newTee(mv(pipe.in));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
// Pump one branch, read another.
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
expectRead(*right, "foo bar").wait(ws);
writePromise.wait(ws);
uint8_t dummy = 0;
auto rightReadPromise = right->tryRead(&dummy, 1, 1);
// Induce a read exception. We should see it propagated to both the read and pump promises.
pipe.out = nullptr;
// Relieve backpressure in the tee to see the exceptions.
auto leftAllPromise = leftPipe.in->readAllText();
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPumpPromise.wait(ws));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", rightReadPromise.wait(ws));
// Make sure we got the data on the destination pipe.
KJ_EXPECT(!leftAllPromise.poll(ws));
leftPipe.out = nullptr;
KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar");
}
}
KJ_TEST("Userland tee pump write exception propagation") {
kj::EventLoop loop;
WaitScope ws(loop);
auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ",");
auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
// Set up two pumps and let them block.
auto leftPipe = newOneWayPipe();
auto rightPipe = newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
// Induce a write exception in the right branch pump. It should propagate to the right pump
// promise.
rightPipe.in = nullptr;
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", rightPumpPromise.wait(ws));
// The left pump promise does not see the right branch's write exception.
KJ_EXPECT(!leftPumpPromise.poll(ws));
auto allTextPromise = leftPipe.in->readAllText();
KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size());
leftPipe.out = nullptr;
KJ_EXPECT(allTextPromise.wait(ws) == bigText);
}
KJ_TEST("Userland tee pump cancellation implies write cancellation") {
kj::EventLoop loop;
WaitScope ws(loop);
auto text = "foo bar baz"_kj;
auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()));
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
// Arrange to block the left pump on its write operation.
expectRead(*right, "foo ").wait(ws);
KJ_EXPECT(!leftPumpPromise.poll(ws));
// Then cancel the pump, while it's still blocked.
leftPumpPromise = nullptr;
// It should cancel its write operations, so it should now be safe to destroy the output stream to
// which it was pumping.
try {
leftPipe.out = nullptr;
} catch (const Exception& exception) {
KJ_FAIL_EXPECT("write promises were not canceled", exception);
}
}
KJ_TEST("Userland tee buffer size limit") {
kj::EventLoop loop;
WaitScope ws(loop);
auto text = "foo bar baz"_kj;
{
// We can carefully read data to stay under our ridiculously low limit.
auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
expectRead(*left, "fo").wait(ws);
expectRead(*right, "foo ").wait(ws);
expectRead(*left, "o ba").wait(ws);
expectRead(*right, "bar ").wait(ws);
expectRead(*left, "r ba").wait(ws);
expectRead(*right, "baz").wait(ws);
expectRead(*left, "z").wait(ws);
}
{
// Exceeding the limit causes both branches to see the exception after exhausting their buffers.
auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
expectRead(*left, "fo").wait(ws);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded",
expectRead(*left, "o").wait(ws));
expectRead(*right, "fo").wait(ws);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded",
expectRead(*right, "o").wait(ws));
}
{
// We guarantee that two pumps started simultaneously will never exceed our buffer size limit.
auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2);
auto left = kj::mv(tee.branches[0]);
auto right = kj::mv(tee.branches[1]);
auto leftPipe = kj::newOneWayPipe();
auto rightPipe = kj::newOneWayPipe();
auto leftPumpPromise = left->pumpTo(*leftPipe.out);
auto rightPumpPromise = right->pumpTo(*rightPipe.out);
KJ_EXPECT(!leftPumpPromise.poll(ws));
KJ_EXPECT(!rightPumpPromise.poll(ws));
uint8_t leftBuf[11] = { 0 };
uint8_t rightBuf[11] = { 0 };
// The first read on the left pipe will succeed.
auto leftPromise = leftPipe.in->tryRead(leftBuf, 1, 11);
KJ_EXPECT(leftPromise.wait(ws) == 2);
KJ_EXPECT(memcmp(leftBuf, text.begin(), 2) == 0);
// But the second will block until we relieve pressure on the right pipe.
leftPromise = leftPipe.in->tryRead(leftBuf + 2, 1, 9);
KJ_EXPECT(!leftPromise.poll(ws));
// Relieve the right pipe pressure ...
auto rightPromise = rightPipe.in->tryRead(rightBuf, 1, 11);
KJ_EXPECT(rightPromise.wait(ws) == 2);
KJ_EXPECT(memcmp(rightBuf, text.begin(), 2) == 0);
// Now the second left pipe read will complete.
KJ_EXPECT(leftPromise.wait(ws) == 2);
KJ_EXPECT(memcmp(leftBuf, text.begin(), 4) == 0);
// Leapfrog the left branch with the right. There should be 2 bytes in the buffer, so we can
// demand a total of 4.
rightPromise = rightPipe.in->tryRead(rightBuf + 2, 4, 9);
KJ_EXPECT(rightPromise.wait(ws) == 4);
KJ_EXPECT(memcmp(rightBuf, text.begin(), 6) == 0);
// Leapfrog the right with the left. We demand the entire rest of the stream, so this should
// block. Note that a regular read for this amount on one of the tee branches directly would
// exceed our buffer size limit, but this one does not, because we have the pipe to regulate
// backpressure for us.
leftPromise = leftPipe.in->tryRead(leftBuf + 4, 7, 7);
KJ_EXPECT(!leftPromise.poll(ws));
// Ask for the entire rest of the stream on the right branch and wrap things up.
rightPromise = rightPipe.in->tryRead(rightBuf + 6, 5, 5);
KJ_EXPECT(leftPromise.wait(ws) == 7);
KJ_EXPECT(memcmp(leftBuf, text.begin(), 11) == 0);
KJ_EXPECT(rightPromise.wait(ws) == 5);
KJ_EXPECT(memcmp(rightBuf, text.begin(), 11) == 0);
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "vector.h" #include "vector.h"
#include "io.h" #include "io.h"
#include "one-of.h" #include "one-of.h"
#include <deque>
#if _WIN32 #if _WIN32
#include <winsock2.h> #include <winsock2.h>
...@@ -1125,6 +1126,629 @@ TwoWayPipe newTwoWayPipe() { ...@@ -1125,6 +1126,629 @@ TwoWayPipe newTwoWayPipe() {
return { { kj::mv(end1), kj::mv(end2) } }; return { { kj::mv(end1), kj::mv(end2) } };
} }
namespace {
class AsyncTee final: public Refcounted {
public:
using BranchId = uint;
explicit AsyncTee(Own<AsyncInputStream> inner, uint64_t bufferSizeLimit)
: inner(mv(inner)), bufferSizeLimit(bufferSizeLimit), length(this->inner->tryGetLength()) {}
~AsyncTee() noexcept(false) {
bool hasBranches = false;
for (auto& branch: branches) {
hasBranches = hasBranches || branch != nullptr;
}
KJ_ASSERT(!hasBranches, "destroying AsyncTee with branch still alive") {
// Don't std::terminate().
break;
}
}
void addBranch(BranchId branch) {
KJ_REQUIRE(branches[branch] == nullptr, "branch already exists");
branches[branch] = Branch();
}
void removeBranch(BranchId branch) {
auto& state = KJ_REQUIRE_NONNULL(branches[branch], "branch was already destroyed");
KJ_REQUIRE(state.sink == nullptr,
"destroying tee branch with operation still in-progress; probably going to segfault") {
// Don't std::terminate().
break;
}
branches[branch] = nullptr;
}
Promise<size_t> tryRead(BranchId branch, void* buffer, size_t minBytes, size_t maxBytes) {
auto& state = KJ_ASSERT_NONNULL(branches[branch]);
KJ_ASSERT(state.sink == nullptr);
// If there is excess data in the buffer for us, slurp that up.
auto readBuffer = arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes);
auto readSoFar = state.buffer.consume(readBuffer, minBytes);
if (minBytes == 0) {
return readSoFar;
}
if (state.buffer.empty()) {
KJ_IF_MAYBE(reason, stoppage) {
// Prefer a short read to an exception. The exception prevents the pull loop from adding any
// data to the buffer, so `readSoFar` will be zero the next time someone calls `tryRead()`,
// and the caller will see the exception.
if (reason->is<Eof>() || readSoFar > 0) {
return readSoFar;
}
return cp(reason->get<Exception>());
}
}
auto promise = newAdaptedPromise<size_t, ReadSink>(state.sink, readBuffer, minBytes, readSoFar);
ensurePulling();
return mv(promise);
}
Maybe<uint64_t> tryGetLength(BranchId branch) {
auto& state = KJ_ASSERT_NONNULL(branches[branch]);
return length.map([&state](uint64_t amount) {
return amount + state.buffer.size();
});
}
Promise<uint64_t> pumpTo(BranchId branch, AsyncOutputStream& output, uint64_t amount) {
auto& state = KJ_ASSERT_NONNULL(branches[branch]);
KJ_ASSERT(state.sink == nullptr);
if (amount == 0) {
return amount;
}
if (state.buffer.empty()) {
KJ_IF_MAYBE(reason, stoppage) {
if (reason->is<Eof>()) {
return uint64_t(0);
}
return cp(reason->get<Exception>());
}
}
auto promise = newAdaptedPromise<uint64_t, PumpSink>(state.sink, output, amount);
ensurePulling();
return mv(promise);
}
private:
struct Eof {};
using Stoppage = OneOf<Eof, Exception>;
class Buffer {
public:
uint64_t consume(ArrayPtr<byte>& readBuffer, size_t& minBytes);
// Consume as many bytes as possible, copying them into `readBuffer`. Return the number of bytes
// consumed.
//
// `readBuffer` and `minBytes` are both assigned appropriate new values, such that after any
// call to `consume()`, `readBuffer` will point to the remaining slice of unwritten space, and
// `minBytes` will have been decremented (clamped to zero) by the amount of bytes read. That is,
// the read can be considered fulfilled if `minBytes` is zero after a call to `consume()`.
Array<const ArrayPtr<const byte>> asArray(uint64_t minBytes, uint64_t& amount);
// Consume the first `minBytes` of the buffer (or the entire buffer) and return it in an Array
// of ArrayPtr<const byte>s, suitable for passing to AsyncOutputStream.write(). The outer Array
// owns the underlying data.
void produce(Array<byte> bytes);
// Enqueue a byte array to the end of the buffer list.
bool empty() const;
uint64_t size() const;
private:
std::deque<Array<byte>> bufferList;
};
class Sink {
public:
struct Need {
// We use uint64_t here because:
// - pumpTo() accepts it as the `amount` parameter.
// - all practical values of tryRead()'s `maxBytes` parameter (a size_t) should also fit into
// a uint64_t, unless we're on a machine with multiple exabytes of memory ...
uint64_t minBytes = 0;
uint64_t maxBytes = kj::maxValue;
};
virtual Promise<void> fill(Buffer& inBuffer, const Maybe<Stoppage>& stoppage) = 0;
// Attempt to fill the sink with bytes andreturn a promise which must resolve before any inner
// read may be attempted. If a sink requires backpressure to be respected, this is how it should
// be communicated.
//
// If the sink is full, it must detach from the tee before the returned promise is resolved.
//
// The returned promise must not result in an exception.
virtual Need need() = 0;
virtual void reject(Exception&& exception) = 0;
// Inform this sink of a catastrophic exception and detach it. Regular read exceptions should be
// propagated through `fill()`'s stoppage parameter instead.
};
template <typename T>
class SinkBase: public Sink {
// Registers itself with the tee as a sink on construction, detaches from the tee on
// fulfillment, rejection, or destruction.
//
// A bit of a Frankenstein, avert your eyes. For one thing, it's more of a mixin than a base...
public:
explicit SinkBase(PromiseFulfiller<T>& fulfiller, Maybe<Sink&>& sinkLink)
: fulfiller(fulfiller), sinkLink(sinkLink) {
KJ_ASSERT(sinkLink == nullptr, "sink initiated with sink already in flight");
sinkLink = *this;
}
KJ_DISALLOW_COPY(SinkBase);
~SinkBase() noexcept(false) { detach(); }
void reject(Exception&& exception) override {
// The tee is allowed to reject this sink if it needs to, e.g. to propagate a non-inner read
// exception from the pull loop. Only the derived class is allowed to fulfill() directly,
// though -- the tee must keep calling fill().
fulfiller.reject(mv(exception));
detach();
}
protected:
template <typename U>
void fulfill(U value) {
fulfiller.fulfill(fwd<U>(value));
detach();
}
private:
void detach() {
KJ_IF_MAYBE(sink, sinkLink) {
if (sink == this) {
sinkLink = nullptr;
}
}
}
PromiseFulfiller<T>& fulfiller;
Maybe<Sink&>& sinkLink;
};
struct Branch {
Buffer buffer;
Maybe<Sink&> sink;
};
class ReadSink final: public SinkBase<size_t> {
public:
explicit ReadSink(PromiseFulfiller<size_t>& fulfiller, Maybe<Sink&>& registration,
ArrayPtr<byte> buffer, size_t minBytes, size_t readSoFar)
: SinkBase(fulfiller, registration), buffer(buffer),
minBytes(minBytes), readSoFar(readSoFar) {}
Promise<void> fill(Buffer& inBuffer, const Maybe<Stoppage>& stoppage) override {
auto amount = inBuffer.consume(buffer, minBytes);
readSoFar += amount;
if (minBytes == 0) {
// We satisfied the read request.
fulfill(readSoFar);
return READY_NOW;
}
if (amount == 0 && inBuffer.empty()) {
// We made no progress on the read request and the buffer is tapped out.
KJ_IF_MAYBE(reason, stoppage) {
if (reason->is<Eof>() || readSoFar > 0) {
// Prefer short read to exception.
fulfill(readSoFar);
} else {
reject(cp(reason->get<Exception>()));
}
return READY_NOW;
}
}
return READY_NOW;
}
Need need() override { return Need { minBytes, buffer.size() }; }
private:
ArrayPtr<byte> buffer;
size_t minBytes;
// Arguments to the outer tryRead() call, sliced/decremented after every buffer consumption.
size_t readSoFar;
// End result of the outer tryRead().
};
class PumpSink final: public SinkBase<uint64_t> {
public:
explicit PumpSink(PromiseFulfiller<uint64_t>& fulfiller, Maybe<Sink&>& registration,
AsyncOutputStream& output, uint64_t limit)
: SinkBase(fulfiller, registration), output(output), limit(limit) {}
~PumpSink() noexcept(false) {
canceler.cancel("This pump has been canceled.");
}
Promise<void> fill(Buffer& inBuffer, const Maybe<Stoppage>& stoppage) override {
KJ_ASSERT(limit > 0);
uint64_t amount = 0;
// TODO(someday): This consumes data from the buffer, but we cannot know if the stream to
// which we're pumping will accept it until after the write() promise completes. If the
// write() promise rejects, we lose this data. We should consume the data from the buffer
// only after successful writes.
auto writeBuffer = inBuffer.asArray(limit, amount);
KJ_ASSERT(limit >= amount);
if (amount > 0) {
Promise<void> promise = nullptr;
try {
promise = canceler.wrap(output.write(writeBuffer).attach(mv(writeBuffer)));
} catch (const Exception& exception) {
reject(cp(exception));
return READY_NOW;
}
promise = promise.then([this, amount]() {
limit -= amount;
pumpedSoFar += amount;
if (limit == 0) {
fulfill(pumpedSoFar);
}
}).eagerlyEvaluate([this](Exception&& exception) {
reject(mv(exception));
});
return mv(promise);
} else KJ_IF_MAYBE(reason, stoppage) {
if (reason->is<Eof>()) {
// Unlike in the read case, it makes more sense to immediately propagate exceptions to the
// pump promise rather than show it a "short pump".
fulfill(pumpedSoFar);
} else {
reject(cp(reason->get<Exception>()));
}
}
return READY_NOW;
}
Need need() override { return Need { 1, limit }; }
private:
AsyncOutputStream& output;
uint64_t limit;
// Arguments to the outer pumpTo() call, decremented after every buffer consumption.
//
// Equal to zero once fulfiller has been fulfilled/rejected.
uint64_t pumpedSoFar = 0;
// End result of the outer pumpTo().
Canceler canceler;
// When the pump is canceled, we also need to cancel any write operations in flight.
};
// =====================================================================================
Maybe<Sink::Need> analyzeSinks() {
// Return nullptr if there are no sinks at all. Otherwise, return the largest `minBytes` and the
// smallest `maxBytes` requested by any sink. The pull loop will use these values to calculate
// the optimal buffer size for the next inner read, so that a minimum amount of data is buffered
// at any given time.
uint64_t minBytes = 0;
uint64_t maxBytes = kj::maxValue;
uint nBranches = 0;
uint nSinks = 0;
for (auto& state: branches) {
KJ_IF_MAYBE(s, state) {
++nBranches;
KJ_IF_MAYBE(sink, s->sink) {
++nSinks;
auto need = sink->need();
minBytes = kj::max(minBytes, need.minBytes);
maxBytes = kj::min(maxBytes, need.maxBytes);
}
}
}
if (nSinks > 0) {
KJ_ASSERT(minBytes > 0);
KJ_ASSERT(maxBytes > 0, "sink was filled but did not detach");
// Sinks may report non-overlapping needs.
maxBytes = kj::max(minBytes, maxBytes);
return Sink::Need { minBytes, maxBytes };
}
// No active sinks.
return nullptr;
}
void ensurePulling() {
if (!pulling) {
pulling = true;
UnwindDetector unwind;
KJ_DEFER(if (unwind.isUnwinding()) pulling = false);
pullPromise = pull();
}
}
Promise<void> pull() {
// Use evalLater() so that two pump sinks added on the same turn of the event loop will not
// cause buffering.
return evalLater([this] {
// Attempt to fill any sinks that exist.
Vector<Promise<void>> promises;
for (auto& state: branches) {
KJ_IF_MAYBE(s, state) {
KJ_IF_MAYBE(sink, s->sink) {
promises.add(sink->fill(s->buffer, stoppage));
}
}
}
// Respect the greatest of the sinks' backpressures.
return joinPromises(promises.releaseAsArray());
}).then([this]() -> Promise<void> {
// Check to see whether we need to perform an inner read.
auto need = analyzeSinks();
if (need == nullptr) {
// No more sinks, stop pulling.
pulling = false;
return READY_NOW;
}
if (stoppage != nullptr) {
// We're eof or errored, don't read, but loop so we can fill the sink(s).
return pull();
}
auto& n = KJ_ASSERT_NONNULL(need);
KJ_ASSERT(n.minBytes > 0);
// We must perform an inner read.
// We'd prefer not to explode our buffer, if that's cool. We cap `maxBytes` to the buffer size
// limit or our builtin MAX_BLOCK_SIZE, whichever is smaller. But, we make sure `maxBytes` is
// still >= `minBytes`.
n.maxBytes = kj::min(n.maxBytes, MAX_BLOCK_SIZE);
n.maxBytes = kj::min(n.maxBytes, bufferSizeLimit);
n.maxBytes = kj::max(n.minBytes, n.maxBytes);
for (auto& state: branches) {
KJ_IF_MAYBE(s, state) {
// TODO(perf): buffer.size() is O(n) where n = # of individual heap-allocated byte arrays.
if (s->buffer.size() + n.maxBytes > bufferSizeLimit) {
stoppage = Stoppage(KJ_EXCEPTION(FAILED, "tee buffer size limit exceeded"));
return pull();
}
}
}
auto heapBuffer = heapArray<byte>(n.maxBytes);
// gcc 4.9 quirk: If I don't hoist this into a separate variable and instead call
//
// inner->tryRead(heapBuffer.begin(), n.minBytes, heapBuffer.size())
//
// `heapBuffer` seems to get moved into the lambda capture before the arguments to `tryRead()`
// are evaluated, meaning `inner` sees a nullptr destination. Bizarrely, `inner` sees the
// correct value for `heapBuffer.size()`... I dunno, man.
auto destination = heapBuffer.begin();
try {
return inner->tryRead(destination, n.minBytes, n.maxBytes)
.then([this, heapBuffer = mv(heapBuffer), minBytes = n.minBytes](size_t amount) mutable
-> Promise<void> {
length = length.map([amount](uint64_t n) {
KJ_ASSERT(n >= amount);
return n - amount;
});
if (amount < heapBuffer.size()) {
heapBuffer = heapBuffer.slice(0, amount).attach(mv(heapBuffer));
}
KJ_ASSERT(stoppage == nullptr);
Maybe<ArrayPtr<byte>> bufferPtr = nullptr;
for (auto& state: branches) {
KJ_IF_MAYBE(s, state) {
// Prefer to move the buffer into the receiving branch's deque, rather than memcpy.
//
// TODO(perf): For the 2-branch case, this is fine, since the majority of the time
// only one buffer will be in use. If we generalize to the n-branch case, this would
// become memcpy-heavy.
KJ_IF_MAYBE(ptr, bufferPtr) {
s->buffer.produce(heapArray(*ptr));
} else {
bufferPtr = ArrayPtr<byte>(heapBuffer);
s->buffer.produce(mv(heapBuffer));
}
}
}
if (amount < minBytes) {
// Short read, EOF.
stoppage = Stoppage(Eof());
}
return pull();
}, [this](Exception&& exception) {
// Exception from the inner tryRead(). Propagate.
stoppage = Stoppage(mv(exception));
return pull();
});
} catch (const Exception& exception) {
// Exception from the inner tryRead(). Propagate.
stoppage = Stoppage(cp(exception));
return pull();
}
}).eagerlyEvaluate([this](Exception&& exception) {
// Exception from our loop, not from inner tryRead(). Something is broken; tell everybody!
pulling = false;
for (auto& state: branches) {
KJ_IF_MAYBE(s, state) {
KJ_IF_MAYBE(sink, s->sink) {
sink->reject(KJ_EXCEPTION(FAILED, "Exception in tee loop", exception));
}
}
}
});
}
constexpr static size_t MAX_BLOCK_SIZE = 1 << 14; // 16k
Own<AsyncInputStream> inner;
const uint64_t bufferSizeLimit = kj::maxValue;
Maybe<uint64_t> length;
Maybe<Branch> branches[2];
Maybe<Stoppage> stoppage;
Promise<void> pullPromise = READY_NOW;
bool pulling = false;
};
constexpr size_t AsyncTee::MAX_BLOCK_SIZE;
uint64_t AsyncTee::Buffer::consume(ArrayPtr<byte>& readBuffer, size_t& minBytes) {
uint64_t totalAmount = 0;
while (readBuffer.size() > 0 && !bufferList.empty()) {
auto& bytes = bufferList.front();
auto amount = kj::min(bytes.size(), readBuffer.size());
memcpy(readBuffer.begin(), bytes.begin(), amount);
totalAmount += amount;
readBuffer = readBuffer.slice(amount, readBuffer.size());
minBytes -= kj::min(amount, minBytes);
if (amount == bytes.size()) {
bufferList.pop_front();
} else {
bytes = heapArray(bytes.slice(amount, bytes.size()));
return totalAmount;
}
}
return totalAmount;
}
void AsyncTee::Buffer::produce(Array<byte> bytes) {
bufferList.push_back(mv(bytes));
}
Array<const ArrayPtr<const byte>> AsyncTee::Buffer::asArray(
uint64_t maxBytes, uint64_t& amount) {
amount = 0;
Vector<ArrayPtr<const byte>> buffers;
Vector<Array<byte>> ownBuffers;
while (maxBytes > 0 && !bufferList.empty()) {
auto& bytes = bufferList.front();
if (bytes.size() <= maxBytes) {
amount += bytes.size();
maxBytes -= bytes.size();
buffers.add(bytes);
ownBuffers.add(mv(bytes));
bufferList.pop_front();
} else {
auto ownBytes = heapArray(bytes.slice(0, maxBytes));
buffers.add(ownBytes);
ownBuffers.add(mv(ownBytes));
bytes = heapArray(bytes.slice(maxBytes, bytes.size()));
amount += maxBytes;
maxBytes = 0;
}
}
if (buffers.size() > 0) {
return buffers.releaseAsArray().attach(mv(ownBuffers));
}
return {};
}
bool AsyncTee::Buffer::empty() const {
return bufferList.empty();
}
uint64_t AsyncTee::Buffer::size() const {
uint64_t result = 0;
for (auto& bytes: bufferList) {
result += bytes.size();
}
return result;
}
class TeeBranch final: public AsyncInputStream {
public:
TeeBranch(Own<AsyncTee> tee, uint8_t branch): tee(mv(tee)), branch(branch) {
this->tee->addBranch(branch);
}
~TeeBranch() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() {
tee->removeBranch(branch);
});
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return tee->tryRead(branch, buffer, minBytes, maxBytes);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return tee->pumpTo(branch, output, amount);
}
Maybe<uint64_t> tryGetLength() override {
return tee->tryGetLength(branch);
}
private:
Own<AsyncTee> tee;
const uint8_t branch;
UnwindDetector unwind;
};
} // namespace
Tee newTee(Own<AsyncInputStream> input, uint64_t limit) {
auto impl = refcounted<AsyncTee>(mv(input), limit);
Own<AsyncInputStream> branch1 = heap<TeeBranch>(addRef(*impl), 0);
Own<AsyncInputStream> branch2 = heap<TeeBranch>(mv(impl), 1);
return { { mv(branch1), mv(branch2) } };
}
Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() { Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() {
return tryReceiveStream() return tryReceiveStream()
.then([](Maybe<Own<AsyncCapabilityStream>>&& result) .then([](Maybe<Own<AsyncCapabilityStream>>&& result)
......
...@@ -203,6 +203,30 @@ struct CapabilityPipe { ...@@ -203,6 +203,30 @@ struct CapabilityPipe {
Own<AsyncCapabilityStream> ends[2]; Own<AsyncCapabilityStream> ends[2];
}; };
struct Tee {
// Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream.
Own<AsyncInputStream> branches[2];
};
Tee newTee(Own<AsyncInputStream> input, uint64_t limit = kj::maxValue);
// Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is
// called on one of the two input ends. If a read or pump operation is subsequently called on the
// other input end, the buffered data is consumed.
//
// `pumpTo()` operations on the input ends will proactively read from the inner stream and block
// while writing to the output stream. While one branch has an active `pumpTo()` operation, any
// `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the
// pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if
// there are `pumpTo()` operations active on both branches, the greater of the two backpressures is
// respected -- the two pumps progress in lockstep, and there is no buffering.
//
// At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would
// grow beyond the limit, an exception is generated, which both branches see once they have
// exhausted their buffers.
//
// It is recommended that you use a more conservative value for `limit` than the default.
class ConnectionReceiver { class ConnectionReceiver {
// Represents a server socket listening on a port. // Represents a server socket listening on a port.
......
...@@ -37,12 +37,6 @@ struct UrlOptions { ...@@ -37,12 +37,6 @@ struct UrlOptions {
bool percentDecode = true; bool percentDecode = true;
// True if URL components should be automatically percent-decoded during parsing, and // True if URL components should be automatically percent-decoded during parsing, and
// percent-encoded during serialization. // percent-encoded during serialization.
#if __cplusplus < 201402L
inline constexpr UrlOptions(bool percentDecode = true): percentDecode(percentDecode) {}
// TODO(cleanup): This constructor is only here to support brace initialization in C++11. It
// should be removed once we upgrade to C++14.
#endif
}; };
struct Url { struct Url {
...@@ -103,17 +97,6 @@ struct Url { ...@@ -103,17 +97,6 @@ struct Url {
~Url() noexcept(false); ~Url() noexcept(false);
Url& operator=(Url&&) = default; Url& operator=(Url&&) = default;
#if __cplusplus < 201402L
inline Url(String&& scheme, Maybe<UserInfo>&& userInfo, String&& host, Vector<String>&& path,
bool hasTrailingSlash, Vector<QueryParam>&& query, Maybe<String>&& fragment,
UrlOptions options)
: scheme(kj::mv(scheme)), userInfo(kj::mv(userInfo)), host(kj::mv(host)), path(kj::mv(path)),
hasTrailingSlash(hasTrailingSlash), query(kj::mv(query)), fragment(kj::mv(fragment)),
options(options) {}
// TODO(cleanup): This constructor is only here to support brace initialization in C++11. It
// should be removed once we upgrade to C++14.
#endif
Url clone() const; Url clone() const;
enum Context { enum Context {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment