Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
3c7748b7
Commit
3c7748b7
authored
Aug 30, 2018
by
Harris Hancock
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Implement newTee()
parent
e2744f24
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
1377 additions
and
1 deletion
+1377
-1
async-io-test.c++
c++/src/kj/async-io-test.c++
+729
-1
async-io.c++
c++/src/kj/async-io.c++
+624
-0
async-io.h
c++/src/kj/async-io.h
+24
-0
No files found.
c++/src/kj/async-io-test.c++
View file @
3c7748b7
...
...
@@ -681,7 +681,7 @@ kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
}));
}
class
MockAsyncInputStream
:
public
AsyncInputStream
{
class
MockAsyncInputStream
final
:
public
AsyncInputStream
{
public
:
MockAsyncInputStream
(
kj
::
ArrayPtr
<
const
byte
>
bytes
,
size_t
blockSize
)
:
bytes
(
bytes
),
blockSize
(
blockSize
)
{}
...
...
@@ -1375,5 +1375,733 @@ KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
pipe2
.
out
=
nullptr
;
}
constexpr
static
auto
TEE_MAX_CHUNK_SIZE
=
1
<<
14
;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
KJ_TEST
(
"Userland tee"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
tee
=
newTee
(
kj
::
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
expectRead
(
*
left
,
"foobar"
).
wait
(
ws
);
writePromise
.
wait
(
ws
);
expectRead
(
*
right
,
"foobar"
).
wait
(
ws
);
}
KJ_TEST
(
"Userland tee concurrent read"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
tee
=
newTee
(
kj
::
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
uint8_t
leftBuf
[
6
]
=
{
0
};
uint8_t
rightBuf
[
6
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
6
,
6
);
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
6
,
6
);
KJ_EXPECT
(
!
leftPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPromise
.
poll
(
ws
));
pipe
.
out
->
write
(
"foobar"
,
6
).
wait
(
ws
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
"foobar"
,
6
)
==
0
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
"foobar"
,
6
)
==
0
);
}
KJ_TEST
(
"Userland tee cancel and restart read"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
tee
=
newTee
(
kj
::
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
{
// Initiate a read and immediately cancel it.
uint8_t
buf
[
6
]
=
{
0
};
auto
promise
=
left
->
tryRead
(
buf
,
6
,
6
);
}
// Subsequent reads still see the full data.
expectRead
(
*
left
,
"foobar"
).
wait
(
ws
);
writePromise
.
wait
(
ws
);
expectRead
(
*
right
,
"foobar"
).
wait
(
ws
);
}
KJ_TEST
(
"Userland tee cancel read and destroy branch then read other branch"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
tee
=
newTee
(
kj
::
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
{
// Initiate a read and immediately cancel it.
uint8_t
buf
[
6
]
=
{
0
};
auto
promise
=
left
->
tryRead
(
buf
,
6
,
6
);
}
// And destroy the branch for good measure.
left
=
nullptr
;
// Subsequent reads on the other branch still see the full data.
expectRead
(
*
right
,
"foobar"
).
wait
(
ws
);
writePromise
.
wait
(
ws
);
}
KJ_TEST
(
"Userland tee subsequent other-branch reads are READY_NOW"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
tee
=
newTee
(
kj
::
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
uint8_t
leftBuf
[
6
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
6
,
6
);
// This is the first read, so there should NOT be buffered data.
KJ_EXPECT
(
!
leftPromise
.
poll
(
ws
));
pipe
.
out
->
write
(
"foobar"
,
6
).
wait
(
ws
);
leftPromise
.
wait
(
ws
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
"foobar"
,
6
)
==
0
);
uint8_t
rightBuf
[
6
]
=
{
0
};
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
6
,
6
);
// The left read promise was fulfilled, so there SHOULD be buffered data.
KJ_EXPECT
(
rightPromise
.
poll
(
ws
));
rightPromise
.
wait
(
ws
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
"foobar"
,
6
)
==
0
);
}
KJ_TEST
(
"Userland tee read EOF propagation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
pipe
=
newOneWayPipe
();
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
// Lengthless pipe, so ...
KJ_EXPECT
(
left
->
tryGetLength
()
==
nullptr
);
KJ_EXPECT
(
right
->
tryGetLength
()
==
nullptr
);
uint8_t
leftBuf
[
7
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
size
(
leftBuf
),
size
(
leftBuf
));
writePromise
.
wait
(
ws
);
// Destroying the output side should force a short read.
pipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
"foobar"
,
6
)
==
0
);
// And we should see a short read here, too.
uint8_t
rightBuf
[
7
]
=
{
0
};
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
size
(
rightBuf
),
size
(
rightBuf
));
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
"foobar"
,
6
)
==
0
);
// Further reads should all be short.
KJ_EXPECT
(
left
->
tryRead
(
leftBuf
,
1
,
size
(
leftBuf
)).
wait
(
ws
)
==
0
);
KJ_EXPECT
(
right
->
tryRead
(
rightBuf
,
1
,
size
(
rightBuf
)).
wait
(
ws
)
==
0
);
}
KJ_TEST
(
"Userland tee read exception propagation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
// Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
// ended prematurely" exception when we destroy the output side early.
auto
pipe
=
newOneWayPipe
(
7
);
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
// Test tryGetLength() while we're at it.
KJ_EXPECT
(
KJ_ASSERT_NONNULL
(
left
->
tryGetLength
())
==
7
);
KJ_EXPECT
(
KJ_ASSERT_NONNULL
(
right
->
tryGetLength
())
==
7
);
uint8_t
leftBuf
[
7
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
6
,
size
(
leftBuf
));
writePromise
.
wait
(
ws
);
// Destroying the output side should force a fulfillment of the read (since we reached minBytes).
pipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
"foobar"
,
6
)
==
0
);
// The next read sees the exception.
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
left
->
tryRead
(
leftBuf
,
1
,
size
(
leftBuf
)).
wait
(
ws
));
// Test tryGetLength() here -- the unread branch still sees the original length value.
KJ_EXPECT
(
KJ_ASSERT_NONNULL
(
left
->
tryGetLength
())
==
1
);
KJ_EXPECT
(
KJ_ASSERT_NONNULL
(
right
->
tryGetLength
())
==
7
);
// We should see the buffered data on the other side, even though we don't reach our minBytes.
uint8_t
rightBuf
[
7
]
=
{
0
};
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
size
(
rightBuf
),
size
(
rightBuf
));
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
"foobar"
,
6
)
==
0
);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
right
->
tryRead
(
rightBuf
,
1
,
size
(
leftBuf
)).
wait
(
ws
));
// Further reads should all see the exception again.
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
left
->
tryRead
(
leftBuf
,
1
,
size
(
leftBuf
)).
wait
(
ws
));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
right
->
tryRead
(
rightBuf
,
1
,
size
(
leftBuf
)).
wait
(
ws
));
}
KJ_TEST
(
"Userland tee read exception propagation w/ data loss"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
// Make a pipe expecting to read more than we're actually going to write. This will force a "pipe
// ended prematurely" exception once the pipe sees a short read.
auto
pipe
=
newOneWayPipe
(
7
);
auto
writePromise
=
pipe
.
out
->
write
(
"foobar"
,
6
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
uint8_t
leftBuf
[
7
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
7
,
7
);
writePromise
.
wait
(
ws
);
// Destroying the output side should force an exception, since we didn't reach our minBytes.
pipe
.
out
=
nullptr
;
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
leftPromise
.
wait
(
ws
));
// And we should see a short read here, too. In fact, we shouldn't see anything: the short read
// above read all of the pipe's data, but then failed to buffer it because it encountered an
// exception. It buffered the exception, instead.
uint8_t
rightBuf
[
7
]
=
{
0
};
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
right
->
tryRead
(
rightBuf
,
1
,
1
).
wait
(
ws
));
}
KJ_TEST
(
"Userland tee read into different buffer sizes"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
"foo bar baz"
_kj
.
asBytes
(),
11
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
uint8_t
leftBuf
[
5
]
=
{
0
};
uint8_t
rightBuf
[
11
]
=
{
0
};
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
5
,
5
);
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
11
,
11
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
5
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
11
);
}
KJ_TEST
(
"Userland tee reads see max(minBytes...) and min(maxBytes...)"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
"foo bar baz"
_kj
.
asBytes
(),
11
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
{
uint8_t
leftBuf
[
5
]
=
{
0
};
uint8_t
rightBuf
[
11
]
=
{
0
};
// Subrange of another range. The smaller maxBytes should win.
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
3
,
5
);
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
1
,
11
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
5
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
5
);
}
{
uint8_t
leftBuf
[
5
]
=
{
0
};
uint8_t
rightBuf
[
11
]
=
{
0
};
// Disjoint ranges. The larger minBytes should win.
auto
leftPromise
=
left
->
tryRead
(
leftBuf
,
3
,
5
);
auto
rightPromise
=
right
->
tryRead
(
rightBuf
,
6
,
11
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
5
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
6
);
KJ_EXPECT
(
left
->
tryRead
(
leftBuf
,
1
,
2
).
wait
(
ws
)
==
1
);
}
}
KJ_TEST
(
"Userland tee read stress test"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
bigText
=
strArray
(
kj
::
repeat
(
"foo bar baz"
_kj
,
12345
),
","
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
bigText
.
asBytes
(),
bigText
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftBuffer
=
heapArray
<
byte
>
(
bigText
.
size
());
{
auto
leftSlice
=
leftBuffer
.
slice
(
0
,
leftBuffer
.
size
());
while
(
leftSlice
.
size
()
>
0
)
{
for
(
size_t
blockSize
:
{
2
,
3
,
5
,
7
,
11
,
13
,
17
,
19
,
23
,
29
,
31
,
37
,
41
,
43
,
47
,
53
,
59
})
{
if
(
leftSlice
.
size
()
==
0
)
break
;
auto
maxBytes
=
min
(
blockSize
,
leftSlice
.
size
());
auto
amount
=
left
->
tryRead
(
leftSlice
.
begin
(),
1
,
maxBytes
).
wait
(
ws
);
leftSlice
=
leftSlice
.
slice
(
amount
,
leftSlice
.
size
());
}
}
}
KJ_EXPECT
(
memcmp
(
leftBuffer
.
begin
(),
bigText
.
begin
(),
leftBuffer
.
size
())
==
0
);
KJ_EXPECT
(
right
->
readAllText
().
wait
(
ws
)
==
bigText
);
}
KJ_TEST
(
"Userland tee pump"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
bigText
=
strArray
(
kj
::
repeat
(
"foo bar baz"
_kj
,
12345
),
","
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
bigText
.
asBytes
(),
bigText
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
,
7
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
// Neither are ready yet, because the left pump's backpressure has blocked the AsyncTee's pull
// loop until we read from leftPipe.
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
expectRead
(
*
leftPipe
.
in
,
"foo bar"
).
wait
(
ws
);
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
7
);
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
// We should be able to read up to how far the left side pumped, and beyond. The left side will
// now have data in its buffer.
expectRead
(
*
rightPipe
.
in
,
"foo bar baz,foo bar baz,foo"
).
wait
(
ws
);
// Consume the left side buffer.
expectRead
(
*
left
,
" baz,foo bar"
).
wait
(
ws
);
// We can destroy the left branch entirely and the right branch will still see all data.
left
=
nullptr
;
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
auto
allTextPromise
=
rightPipe
.
in
->
readAllText
();
KJ_EXPECT
(
rightPumpPromise
.
wait
(
ws
)
==
bigText
.
size
());
// Need to force an EOF in the right pipe to check the result.
rightPipe
.
out
=
nullptr
;
KJ_EXPECT
(
allTextPromise
.
wait
(
ws
)
==
bigText
.
slice
(
27
));
}
KJ_TEST
(
"Userland tee pump slows down reads"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
bigText
=
strArray
(
kj
::
repeat
(
"foo bar baz"
_kj
,
12345
),
","
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
bigText
.
asBytes
(),
bigText
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
// The left pump will cause some data to be buffered on the right branch, which we can read.
auto
rightExpectation0
=
kj
::
str
(
bigText
.
slice
(
0
,
TEE_MAX_CHUNK_SIZE
));
expectRead
(
*
right
,
rightExpectation0
).
wait
(
ws
);
// But the next right branch read is blocked by the left pipe's backpressure.
auto
rightExpectation1
=
kj
::
str
(
bigText
.
slice
(
TEE_MAX_CHUNK_SIZE
,
TEE_MAX_CHUNK_SIZE
+
10
));
auto
rightPromise
=
expectRead
(
*
right
,
rightExpectation1
);
KJ_EXPECT
(
!
rightPromise
.
poll
(
ws
));
// The right branch read finishes when we relieve the pressure in the left pipe.
auto
allTextPromise
=
leftPipe
.
in
->
readAllText
();
rightPromise
.
wait
(
ws
);
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
bigText
.
size
());
leftPipe
.
out
=
nullptr
;
KJ_EXPECT
(
allTextPromise
.
wait
(
ws
)
==
bigText
);
}
KJ_TEST
(
"Userland tee pump EOF propagation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
{
// EOF encountered by two pump operations.
auto
pipe
=
newOneWayPipe
();
auto
writePromise
=
pipe
.
out
->
write
(
"foo bar"
,
7
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
// Pump the first bit, and block.
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
writePromise
.
wait
(
ws
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
// Induce an EOF. We should see it propagated to both pump promises.
pipe
.
out
=
nullptr
;
// Relieve backpressure.
auto
leftAllPromise
=
leftPipe
.
in
->
readAllText
();
auto
rightAllPromise
=
rightPipe
.
in
->
readAllText
();
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
7
);
KJ_EXPECT
(
rightPumpPromise
.
wait
(
ws
)
==
7
);
// Make sure we got the data on the pipes that were being pumped to.
KJ_EXPECT
(
!
leftAllPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightAllPromise
.
poll
(
ws
));
leftPipe
.
out
=
nullptr
;
rightPipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftAllPromise
.
wait
(
ws
)
==
"foo bar"
);
KJ_EXPECT
(
rightAllPromise
.
wait
(
ws
)
==
"foo bar"
);
}
{
// EOF encountered by a read and pump operation.
auto
pipe
=
newOneWayPipe
();
auto
writePromise
=
pipe
.
out
->
write
(
"foo bar"
,
7
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
// Pump one branch, read another.
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
expectRead
(
*
right
,
"foo bar"
).
wait
(
ws
);
writePromise
.
wait
(
ws
);
uint8_t
dummy
=
0
;
auto
rightReadPromise
=
right
->
tryRead
(
&
dummy
,
1
,
1
);
// Induce an EOF. We should see it propagated to both the read and pump promises.
pipe
.
out
=
nullptr
;
// Relieve backpressure in the tee to see the EOF.
auto
leftAllPromise
=
leftPipe
.
in
->
readAllText
();
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
7
);
KJ_EXPECT
(
rightReadPromise
.
wait
(
ws
)
==
0
);
// Make sure we got the data on the pipe that was being pumped to.
KJ_EXPECT
(
!
leftAllPromise
.
poll
(
ws
));
leftPipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftAllPromise
.
wait
(
ws
)
==
"foo bar"
);
}
}
KJ_TEST
(
"Userland tee pump EOF on chunk boundary"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
bigText
=
strArray
(
kj
::
repeat
(
"foo bar baz"
_kj
,
12345
),
","
);
// Conjure an EOF right on the boundary of the tee's internal chunk.
auto
chunkText
=
kj
::
str
(
bigText
.
slice
(
0
,
TEE_MAX_CHUNK_SIZE
));
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
chunkText
.
asBytes
(),
chunkText
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
auto
leftAllPromise
=
leftPipe
.
in
->
readAllText
();
auto
rightAllPromise
=
rightPipe
.
in
->
readAllText
();
// The pumps should see the EOF and stop.
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
TEE_MAX_CHUNK_SIZE
);
KJ_EXPECT
(
rightPumpPromise
.
wait
(
ws
)
==
TEE_MAX_CHUNK_SIZE
);
// Verify that we saw the data on the other end of the destination pipes.
leftPipe
.
out
=
nullptr
;
rightPipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftAllPromise
.
wait
(
ws
)
==
chunkText
);
KJ_EXPECT
(
rightAllPromise
.
wait
(
ws
)
==
chunkText
);
}
KJ_TEST
(
"Userland tee pump read exception propagation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
{
// Exception encountered by two pump operations.
auto
pipe
=
newOneWayPipe
(
14
);
auto
writePromise
=
pipe
.
out
->
write
(
"foo bar"
,
7
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
// Pump the first bit, and block.
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
writePromise
.
wait
(
ws
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
// Induce a read exception. We should see it propagated to both pump promises.
pipe
.
out
=
nullptr
;
// Both promises must exist before the backpressure in the tee is relieved, and the tee pull
// loop actually sees the exception.
auto
leftAllPromise
=
leftPipe
.
in
->
readAllText
();
auto
rightAllPromise
=
rightPipe
.
in
->
readAllText
();
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
leftPumpPromise
.
wait
(
ws
));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
rightPumpPromise
.
wait
(
ws
));
// Make sure we got the data on the destination pipes.
KJ_EXPECT
(
!
leftAllPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightAllPromise
.
poll
(
ws
));
leftPipe
.
out
=
nullptr
;
rightPipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftAllPromise
.
wait
(
ws
)
==
"foo bar"
);
KJ_EXPECT
(
rightAllPromise
.
wait
(
ws
)
==
"foo bar"
);
}
{
// Exception encountered by a read and pump operation.
auto
pipe
=
newOneWayPipe
(
14
);
auto
writePromise
=
pipe
.
out
->
write
(
"foo bar"
,
7
);
auto
tee
=
newTee
(
mv
(
pipe
.
in
));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
// Pump one branch, read another.
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
expectRead
(
*
right
,
"foo bar"
).
wait
(
ws
);
writePromise
.
wait
(
ws
);
uint8_t
dummy
=
0
;
auto
rightReadPromise
=
right
->
tryRead
(
&
dummy
,
1
,
1
);
// Induce a read exception. We should see it propagated to both the read and pump promises.
pipe
.
out
=
nullptr
;
// Relieve backpressure in the tee to see the exceptions.
auto
leftAllPromise
=
leftPipe
.
in
->
readAllText
();
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
leftPumpPromise
.
wait
(
ws
));
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"pipe ended prematurely"
,
rightReadPromise
.
wait
(
ws
));
// Make sure we got the data on the destination pipe.
KJ_EXPECT
(
!
leftAllPromise
.
poll
(
ws
));
leftPipe
.
out
=
nullptr
;
KJ_EXPECT
(
leftAllPromise
.
wait
(
ws
)
==
"foo bar"
);
}
}
KJ_TEST
(
"Userland tee pump write exception propagation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
bigText
=
strArray
(
kj
::
repeat
(
"foo bar baz"
_kj
,
12345
),
","
);
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
bigText
.
asBytes
(),
bigText
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
// Set up two pumps and let them block.
auto
leftPipe
=
newOneWayPipe
();
auto
rightPipe
=
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
// Induce a write exception in the right branch pump. It should propagate to the right pump
// promise.
rightPipe
.
in
=
nullptr
;
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"read end of pipe was aborted"
,
rightPumpPromise
.
wait
(
ws
));
// The left pump promise does not see the right branch's write exception.
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
auto
allTextPromise
=
leftPipe
.
in
->
readAllText
();
KJ_EXPECT
(
leftPumpPromise
.
wait
(
ws
)
==
bigText
.
size
());
leftPipe
.
out
=
nullptr
;
KJ_EXPECT
(
allTextPromise
.
wait
(
ws
)
==
bigText
);
}
KJ_TEST
(
"Userland tee pump cancellation implies write cancellation"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
text
=
"foo bar baz"
_kj
;
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
text
.
asBytes
(),
text
.
size
()));
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
// Arrange to block the left pump on its write operation.
expectRead
(
*
right
,
"foo "
).
wait
(
ws
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
// Then cancel the pump, while it's still blocked.
leftPumpPromise
=
nullptr
;
// It should cancel its write operations, so it should now be safe to destroy the output stream to
// which it was pumping.
try
{
leftPipe
.
out
=
nullptr
;
}
catch
(
const
Exception
&
exception
)
{
KJ_FAIL_EXPECT
(
"write promises were not canceled"
,
exception
);
}
}
KJ_TEST
(
"Userland tee buffer size limit"
)
{
kj
::
EventLoop
loop
;
WaitScope
ws
(
loop
);
auto
text
=
"foo bar baz"
_kj
;
{
// We can carefully read data to stay under our ridiculously low limit.
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
text
.
asBytes
(),
text
.
size
()),
2
);
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
expectRead
(
*
left
,
"fo"
).
wait
(
ws
);
expectRead
(
*
right
,
"foo "
).
wait
(
ws
);
expectRead
(
*
left
,
"o ba"
).
wait
(
ws
);
expectRead
(
*
right
,
"bar "
).
wait
(
ws
);
expectRead
(
*
left
,
"r ba"
).
wait
(
ws
);
expectRead
(
*
right
,
"baz"
).
wait
(
ws
);
expectRead
(
*
left
,
"z"
).
wait
(
ws
);
}
{
// Exceeding the limit causes both branches to see the exception after exhausting their buffers.
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
text
.
asBytes
(),
text
.
size
()),
2
);
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
expectRead
(
*
left
,
"fo"
).
wait
(
ws
);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"tee buffer size limit exceeded"
,
expectRead
(
*
left
,
"o"
).
wait
(
ws
));
expectRead
(
*
right
,
"fo"
).
wait
(
ws
);
KJ_EXPECT_THROW_RECOVERABLE_MESSAGE
(
"tee buffer size limit exceeded"
,
expectRead
(
*
right
,
"o"
).
wait
(
ws
));
}
{
// We guarantee that two pumps started simultaneously will never exceed our buffer size limit.
auto
tee
=
newTee
(
heap
<
MockAsyncInputStream
>
(
text
.
asBytes
(),
text
.
size
()),
2
);
auto
left
=
kj
::
mv
(
tee
.
branches
[
0
]);
auto
right
=
kj
::
mv
(
tee
.
branches
[
1
]);
auto
leftPipe
=
kj
::
newOneWayPipe
();
auto
rightPipe
=
kj
::
newOneWayPipe
();
auto
leftPumpPromise
=
left
->
pumpTo
(
*
leftPipe
.
out
);
auto
rightPumpPromise
=
right
->
pumpTo
(
*
rightPipe
.
out
);
KJ_EXPECT
(
!
leftPumpPromise
.
poll
(
ws
));
KJ_EXPECT
(
!
rightPumpPromise
.
poll
(
ws
));
uint8_t
leftBuf
[
11
]
=
{
0
};
uint8_t
rightBuf
[
11
]
=
{
0
};
// The first read on the left pipe will succeed.
auto
leftPromise
=
leftPipe
.
in
->
tryRead
(
leftBuf
,
1
,
11
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
2
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
text
.
begin
(),
2
)
==
0
);
// But the second will block until we relieve pressure on the right pipe.
leftPromise
=
leftPipe
.
in
->
tryRead
(
leftBuf
+
2
,
1
,
9
);
KJ_EXPECT
(
!
leftPromise
.
poll
(
ws
));
// Relieve the right pipe pressure ...
auto
rightPromise
=
rightPipe
.
in
->
tryRead
(
rightBuf
,
1
,
11
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
2
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
text
.
begin
(),
2
)
==
0
);
// Now the second left pipe read will complete.
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
2
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
text
.
begin
(),
4
)
==
0
);
// Leapfrog the left branch with the right. There should be 2 bytes in the buffer, so we can
// demand a total of 4.
rightPromise
=
rightPipe
.
in
->
tryRead
(
rightBuf
+
2
,
4
,
9
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
4
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
text
.
begin
(),
6
)
==
0
);
// Leapfrog the right with the left. We demand the entire rest of the stream, so this should
// block. Note that a regular read for this amount on one of the tee branches directly would
// exceed our buffer size limit, but this one does not, because we have the pipe to regulate
// backpressure for us.
leftPromise
=
leftPipe
.
in
->
tryRead
(
leftBuf
+
4
,
7
,
7
);
KJ_EXPECT
(
!
leftPromise
.
poll
(
ws
));
// Ask for the entire rest of the stream on the right branch and wrap things up.
rightPromise
=
rightPipe
.
in
->
tryRead
(
rightBuf
+
6
,
5
,
5
);
KJ_EXPECT
(
leftPromise
.
wait
(
ws
)
==
7
);
KJ_EXPECT
(
memcmp
(
leftBuf
,
text
.
begin
(),
11
)
==
0
);
KJ_EXPECT
(
rightPromise
.
wait
(
ws
)
==
5
);
KJ_EXPECT
(
memcmp
(
rightBuf
,
text
.
begin
(),
11
)
==
0
);
}
}
}
// namespace
}
// namespace kj
c++/src/kj/async-io.c++
View file @
3c7748b7
...
...
@@ -31,6 +31,7 @@
#include "vector.h"
#include "io.h"
#include "one-of.h"
#include <deque>
#if _WIN32
#include <winsock2.h>
...
...
@@ -1125,6 +1126,629 @@ TwoWayPipe newTwoWayPipe() {
return
{
{
kj
::
mv
(
end1
),
kj
::
mv
(
end2
)
}
};
}
namespace
{
class
AsyncTee
final
:
public
Refcounted
{
public
:
using
BranchId
=
uint
;
explicit
AsyncTee
(
Own
<
AsyncInputStream
>
inner
,
uint64_t
bufferSizeLimit
)
:
inner
(
mv
(
inner
)),
bufferSizeLimit
(
bufferSizeLimit
),
length
(
this
->
inner
->
tryGetLength
())
{}
~
AsyncTee
()
noexcept
(
false
)
{
bool
hasBranches
=
false
;
for
(
auto
&
branch
:
branches
)
{
hasBranches
=
hasBranches
||
branch
!=
nullptr
;
}
KJ_ASSERT
(
!
hasBranches
,
"destroying AsyncTee with branch still alive"
)
{
// Don't std::terminate().
break
;
}
}
void
addBranch
(
BranchId
branch
)
{
KJ_REQUIRE
(
branches
[
branch
]
==
nullptr
,
"branch already exists"
);
branches
[
branch
]
=
Branch
();
}
void
removeBranch
(
BranchId
branch
)
{
auto
&
state
=
KJ_REQUIRE_NONNULL
(
branches
[
branch
],
"branch was already destroyed"
);
KJ_REQUIRE
(
state
.
sink
==
nullptr
,
"destroying tee branch with operation still in-progress; probably going to segfault"
)
{
// Don't std::terminate().
break
;
}
branches
[
branch
]
=
nullptr
;
}
Promise
<
size_t
>
tryRead
(
BranchId
branch
,
void
*
buffer
,
size_t
minBytes
,
size_t
maxBytes
)
{
auto
&
state
=
KJ_ASSERT_NONNULL
(
branches
[
branch
]);
KJ_ASSERT
(
state
.
sink
==
nullptr
);
// If there is excess data in the buffer for us, slurp that up.
auto
readBuffer
=
arrayPtr
(
reinterpret_cast
<
byte
*>
(
buffer
),
maxBytes
);
auto
readSoFar
=
state
.
buffer
.
consume
(
readBuffer
,
minBytes
);
if
(
minBytes
==
0
)
{
return
readSoFar
;
}
if
(
state
.
buffer
.
empty
())
{
KJ_IF_MAYBE
(
reason
,
stoppage
)
{
// Prefer a short read to an exception. The exception prevents the pull loop from adding any
// data to the buffer, so `readSoFar` will be zero the next time someone calls `tryRead()`,
// and the caller will see the exception.
if
(
reason
->
is
<
Eof
>
()
||
readSoFar
>
0
)
{
return
readSoFar
;
}
return
cp
(
reason
->
get
<
Exception
>
());
}
}
auto
promise
=
newAdaptedPromise
<
size_t
,
ReadSink
>
(
state
.
sink
,
readBuffer
,
minBytes
,
readSoFar
);
ensurePulling
();
return
mv
(
promise
);
}
Maybe
<
uint64_t
>
tryGetLength
(
BranchId
branch
)
{
auto
&
state
=
KJ_ASSERT_NONNULL
(
branches
[
branch
]);
return
length
.
map
([
&
state
](
uint64_t
amount
)
{
return
amount
+
state
.
buffer
.
size
();
});
}
Promise
<
uint64_t
>
pumpTo
(
BranchId
branch
,
AsyncOutputStream
&
output
,
uint64_t
amount
)
{
auto
&
state
=
KJ_ASSERT_NONNULL
(
branches
[
branch
]);
KJ_ASSERT
(
state
.
sink
==
nullptr
);
if
(
amount
==
0
)
{
return
amount
;
}
if
(
state
.
buffer
.
empty
())
{
KJ_IF_MAYBE
(
reason
,
stoppage
)
{
if
(
reason
->
is
<
Eof
>
())
{
return
uint64_t
(
0
);
}
return
cp
(
reason
->
get
<
Exception
>
());
}
}
auto
promise
=
newAdaptedPromise
<
uint64_t
,
PumpSink
>
(
state
.
sink
,
output
,
amount
);
ensurePulling
();
return
mv
(
promise
);
}
private
:
struct
Eof
{};
using
Stoppage
=
OneOf
<
Eof
,
Exception
>
;
class
Buffer
{
public
:
uint64_t
consume
(
ArrayPtr
<
byte
>&
readBuffer
,
size_t
&
minBytes
);
// Consume as many bytes as possible, copying them into `readBuffer`. Return the number of bytes
// consumed.
//
// `readBuffer` and `minBytes` are both assigned appropriate new values, such that after any
// call to `consume()`, `readBuffer` will point to the remaining slice of unwritten space, and
// `minBytes` will have been decremented (clamped to zero) by the amount of bytes read. That is,
// the read can be considered fulfilled if `minBytes` is zero after a call to `consume()`.
Array
<
const
ArrayPtr
<
const
byte
>>
asArray
(
uint64_t
minBytes
,
uint64_t
&
amount
);
// Consume the first `minBytes` of the buffer (or the entire buffer) and return it in an Array
// of ArrayPtr<const byte>s, suitable for passing to AsyncOutputStream.write(). The outer Array
// owns the underlying data.
void
produce
(
Array
<
byte
>
bytes
);
// Enqueue a byte array to the end of the buffer list.
bool
empty
()
const
;
uint64_t
size
()
const
;
private
:
std
::
deque
<
Array
<
byte
>>
bufferList
;
};
class
Sink
{
public
:
struct
Need
{
// We use uint64_t here because:
// - pumpTo() accepts it as the `amount` parameter.
// - all practical values of tryRead()'s `maxBytes` parameter (a size_t) should also fit into
// a uint64_t, unless we're on a machine with multiple exabytes of memory ...
uint64_t
minBytes
=
0
;
uint64_t
maxBytes
=
kj
::
maxValue
;
};
virtual
Promise
<
void
>
fill
(
Buffer
&
inBuffer
,
const
Maybe
<
Stoppage
>&
stoppage
)
=
0
;
// Attempt to fill the sink with bytes andreturn a promise which must resolve before any inner
// read may be attempted. If a sink requires backpressure to be respected, this is how it should
// be communicated.
//
// If the sink is full, it must detach from the tee before the returned promise is resolved.
//
// The returned promise must not result in an exception.
virtual
Need
need
()
=
0
;
virtual
void
reject
(
Exception
&&
exception
)
=
0
;
// Inform this sink of a catastrophic exception and detach it. Regular read exceptions should be
// propagated through `fill()`'s stoppage parameter instead.
};
template
<
typename
T
>
class
SinkBase
:
public
Sink
{
// Registers itself with the tee as a sink on construction, detaches from the tee on
// fulfillment, rejection, or destruction.
//
// A bit of a Frankenstein, avert your eyes. For one thing, it's more of a mixin than a base...
public
:
explicit
SinkBase
(
PromiseFulfiller
<
T
>&
fulfiller
,
Maybe
<
Sink
&>&
sinkLink
)
:
fulfiller
(
fulfiller
),
sinkLink
(
sinkLink
)
{
KJ_ASSERT
(
sinkLink
==
nullptr
,
"sink initiated with sink already in flight"
);
sinkLink
=
*
this
;
}
KJ_DISALLOW_COPY
(
SinkBase
);
~
SinkBase
()
noexcept
(
false
)
{
detach
();
}
void
reject
(
Exception
&&
exception
)
override
{
// The tee is allowed to reject this sink if it needs to, e.g. to propagate a non-inner read
// exception from the pull loop. Only the derived class is allowed to fulfill() directly,
// though -- the tee must keep calling fill().
fulfiller
.
reject
(
mv
(
exception
));
detach
();
}
protected
:
template
<
typename
U
>
void
fulfill
(
U
value
)
{
fulfiller
.
fulfill
(
fwd
<
U
>
(
value
));
detach
();
}
private
:
void
detach
()
{
KJ_IF_MAYBE
(
sink
,
sinkLink
)
{
if
(
sink
==
this
)
{
sinkLink
=
nullptr
;
}
}
}
PromiseFulfiller
<
T
>&
fulfiller
;
Maybe
<
Sink
&>&
sinkLink
;
};
struct
Branch
{
Buffer
buffer
;
Maybe
<
Sink
&>
sink
;
};
class
ReadSink
final
:
public
SinkBase
<
size_t
>
{
public
:
explicit
ReadSink
(
PromiseFulfiller
<
size_t
>&
fulfiller
,
Maybe
<
Sink
&>&
registration
,
ArrayPtr
<
byte
>
buffer
,
size_t
minBytes
,
size_t
readSoFar
)
:
SinkBase
(
fulfiller
,
registration
),
buffer
(
buffer
),
minBytes
(
minBytes
),
readSoFar
(
readSoFar
)
{}
Promise
<
void
>
fill
(
Buffer
&
inBuffer
,
const
Maybe
<
Stoppage
>&
stoppage
)
override
{
auto
amount
=
inBuffer
.
consume
(
buffer
,
minBytes
);
readSoFar
+=
amount
;
if
(
minBytes
==
0
)
{
// We satisfied the read request.
fulfill
(
readSoFar
);
return
READY_NOW
;
}
if
(
amount
==
0
&&
inBuffer
.
empty
())
{
// We made no progress on the read request and the buffer is tapped out.
KJ_IF_MAYBE
(
reason
,
stoppage
)
{
if
(
reason
->
is
<
Eof
>
()
||
readSoFar
>
0
)
{
// Prefer short read to exception.
fulfill
(
readSoFar
);
}
else
{
reject
(
cp
(
reason
->
get
<
Exception
>
()));
}
return
READY_NOW
;
}
}
return
READY_NOW
;
}
Need
need
()
override
{
return
Need
{
minBytes
,
buffer
.
size
()
};
}
private
:
ArrayPtr
<
byte
>
buffer
;
size_t
minBytes
;
// Arguments to the outer tryRead() call, sliced/decremented after every buffer consumption.
size_t
readSoFar
;
// End result of the outer tryRead().
};
class
PumpSink
final
:
public
SinkBase
<
uint64_t
>
{
public
:
explicit
PumpSink
(
PromiseFulfiller
<
uint64_t
>&
fulfiller
,
Maybe
<
Sink
&>&
registration
,
AsyncOutputStream
&
output
,
uint64_t
limit
)
:
SinkBase
(
fulfiller
,
registration
),
output
(
output
),
limit
(
limit
)
{}
~
PumpSink
()
noexcept
(
false
)
{
canceler
.
cancel
(
"This pump has been canceled."
);
}
Promise
<
void
>
fill
(
Buffer
&
inBuffer
,
const
Maybe
<
Stoppage
>&
stoppage
)
override
{
KJ_ASSERT
(
limit
>
0
);
uint64_t
amount
=
0
;
// TODO(someday): This consumes data from the buffer, but we cannot know if the stream to
// which we're pumping will accept it until after the write() promise completes. If the
// write() promise rejects, we lose this data. We should consume the data from the buffer
// only after successful writes.
auto
writeBuffer
=
inBuffer
.
asArray
(
limit
,
amount
);
KJ_ASSERT
(
limit
>=
amount
);
if
(
amount
>
0
)
{
Promise
<
void
>
promise
=
nullptr
;
try
{
promise
=
canceler
.
wrap
(
output
.
write
(
writeBuffer
).
attach
(
mv
(
writeBuffer
)));
}
catch
(
const
Exception
&
exception
)
{
reject
(
cp
(
exception
));
return
READY_NOW
;
}
promise
=
promise
.
then
([
this
,
amount
]()
{
limit
-=
amount
;
pumpedSoFar
+=
amount
;
if
(
limit
==
0
)
{
fulfill
(
pumpedSoFar
);
}
}).
eagerlyEvaluate
([
this
](
Exception
&&
exception
)
{
reject
(
mv
(
exception
));
});
return
mv
(
promise
);
}
else
KJ_IF_MAYBE
(
reason
,
stoppage
)
{
if
(
reason
->
is
<
Eof
>
())
{
// Unlike in the read case, it makes more sense to immediately propagate exceptions to the
// pump promise rather than show it a "short pump".
fulfill
(
pumpedSoFar
);
}
else
{
reject
(
cp
(
reason
->
get
<
Exception
>
()));
}
}
return
READY_NOW
;
}
Need
need
()
override
{
return
Need
{
1
,
limit
};
}
private
:
AsyncOutputStream
&
output
;
uint64_t
limit
;
// Arguments to the outer pumpTo() call, decremented after every buffer consumption.
//
// Equal to zero once fulfiller has been fulfilled/rejected.
uint64_t
pumpedSoFar
=
0
;
// End result of the outer pumpTo().
Canceler
canceler
;
// When the pump is canceled, we also need to cancel any write operations in flight.
};
// =====================================================================================
Maybe
<
Sink
::
Need
>
analyzeSinks
()
{
// Return nullptr if there are no sinks at all. Otherwise, return the largest `minBytes` and the
// smallest `maxBytes` requested by any sink. The pull loop will use these values to calculate
// the optimal buffer size for the next inner read, so that a minimum amount of data is buffered
// at any given time.
uint64_t
minBytes
=
0
;
uint64_t
maxBytes
=
kj
::
maxValue
;
uint
nBranches
=
0
;
uint
nSinks
=
0
;
for
(
auto
&
state
:
branches
)
{
KJ_IF_MAYBE
(
s
,
state
)
{
++
nBranches
;
KJ_IF_MAYBE
(
sink
,
s
->
sink
)
{
++
nSinks
;
auto
need
=
sink
->
need
();
minBytes
=
kj
::
max
(
minBytes
,
need
.
minBytes
);
maxBytes
=
kj
::
min
(
maxBytes
,
need
.
maxBytes
);
}
}
}
if
(
nSinks
>
0
)
{
KJ_ASSERT
(
minBytes
>
0
);
KJ_ASSERT
(
maxBytes
>
0
,
"sink was filled but did not detach"
);
// Sinks may report non-overlapping needs.
maxBytes
=
kj
::
max
(
minBytes
,
maxBytes
);
return
Sink
::
Need
{
minBytes
,
maxBytes
};
}
// No active sinks.
return
nullptr
;
}
void
ensurePulling
()
{
if
(
!
pulling
)
{
pulling
=
true
;
UnwindDetector
unwind
;
KJ_DEFER
(
if
(
unwind
.
isUnwinding
())
pulling
=
false
);
pullPromise
=
pull
();
}
}
Promise
<
void
>
pull
()
{
// Use evalLater() so that two pump sinks added on the same turn of the event loop will not
// cause buffering.
return
evalLater
([
this
]
{
// Attempt to fill any sinks that exist.
Vector
<
Promise
<
void
>>
promises
;
for
(
auto
&
state
:
branches
)
{
KJ_IF_MAYBE
(
s
,
state
)
{
KJ_IF_MAYBE
(
sink
,
s
->
sink
)
{
promises
.
add
(
sink
->
fill
(
s
->
buffer
,
stoppage
));
}
}
}
// Respect the greatest of the sinks' backpressures.
return
joinPromises
(
promises
.
releaseAsArray
());
}).
then
([
this
]()
->
Promise
<
void
>
{
// Check to see whether we need to perform an inner read.
auto
need
=
analyzeSinks
();
if
(
need
==
nullptr
)
{
// No more sinks, stop pulling.
pulling
=
false
;
return
READY_NOW
;
}
if
(
stoppage
!=
nullptr
)
{
// We're eof or errored, don't read, but loop so we can fill the sink(s).
return
pull
();
}
auto
&
n
=
KJ_ASSERT_NONNULL
(
need
);
KJ_ASSERT
(
n
.
minBytes
>
0
);
// We must perform an inner read.
// We'd prefer not to explode our buffer, if that's cool. We cap `maxBytes` to the buffer size
// limit or our builtin MAX_BLOCK_SIZE, whichever is smaller. But, we make sure `maxBytes` is
// still >= `minBytes`.
n
.
maxBytes
=
kj
::
min
(
n
.
maxBytes
,
MAX_BLOCK_SIZE
);
n
.
maxBytes
=
kj
::
min
(
n
.
maxBytes
,
bufferSizeLimit
);
n
.
maxBytes
=
kj
::
max
(
n
.
minBytes
,
n
.
maxBytes
);
for
(
auto
&
state
:
branches
)
{
KJ_IF_MAYBE
(
s
,
state
)
{
// TODO(perf): buffer.size() is O(n) where n = # of individual heap-allocated byte arrays.
if
(
s
->
buffer
.
size
()
+
n
.
maxBytes
>
bufferSizeLimit
)
{
stoppage
=
Stoppage
(
KJ_EXCEPTION
(
FAILED
,
"tee buffer size limit exceeded"
));
return
pull
();
}
}
}
auto
heapBuffer
=
heapArray
<
byte
>
(
n
.
maxBytes
);
// gcc 4.9 quirk: If I don't hoist this into a separate variable and instead call
//
// inner->tryRead(heapBuffer.begin(), n.minBytes, heapBuffer.size())
//
// `heapBuffer` seems to get moved into the lambda capture before the arguments to `tryRead()`
// are evaluated, meaning `inner` sees a nullptr destination. Bizarrely, `inner` sees the
// correct value for `heapBuffer.size()`... I dunno, man.
auto
destination
=
heapBuffer
.
begin
();
try
{
return
inner
->
tryRead
(
destination
,
n
.
minBytes
,
n
.
maxBytes
)
.
then
([
this
,
heapBuffer
=
mv
(
heapBuffer
),
minBytes
=
n
.
minBytes
](
size_t
amount
)
mutable
->
Promise
<
void
>
{
length
=
length
.
map
([
amount
](
uint64_t
n
)
{
KJ_ASSERT
(
n
>=
amount
);
return
n
-
amount
;
});
if
(
amount
<
heapBuffer
.
size
())
{
heapBuffer
=
heapBuffer
.
slice
(
0
,
amount
).
attach
(
mv
(
heapBuffer
));
}
KJ_ASSERT
(
stoppage
==
nullptr
);
Maybe
<
ArrayPtr
<
byte
>>
bufferPtr
=
nullptr
;
for
(
auto
&
state
:
branches
)
{
KJ_IF_MAYBE
(
s
,
state
)
{
// Prefer to move the buffer into the receiving branch's deque, rather than memcpy.
//
// TODO(perf): For the 2-branch case, this is fine, since the majority of the time
// only one buffer will be in use. If we generalize to the n-branch case, this would
// become memcpy-heavy.
KJ_IF_MAYBE
(
ptr
,
bufferPtr
)
{
s
->
buffer
.
produce
(
heapArray
(
*
ptr
));
}
else
{
bufferPtr
=
ArrayPtr
<
byte
>
(
heapBuffer
);
s
->
buffer
.
produce
(
mv
(
heapBuffer
));
}
}
}
if
(
amount
<
minBytes
)
{
// Short read, EOF.
stoppage
=
Stoppage
(
Eof
());
}
return
pull
();
},
[
this
](
Exception
&&
exception
)
{
// Exception from the inner tryRead(). Propagate.
stoppage
=
Stoppage
(
mv
(
exception
));
return
pull
();
});
}
catch
(
const
Exception
&
exception
)
{
// Exception from the inner tryRead(). Propagate.
stoppage
=
Stoppage
(
cp
(
exception
));
return
pull
();
}
}).
eagerlyEvaluate
([
this
](
Exception
&&
exception
)
{
// Exception from our loop, not from inner tryRead(). Something is broken; tell everybody!
pulling
=
false
;
for
(
auto
&
state
:
branches
)
{
KJ_IF_MAYBE
(
s
,
state
)
{
KJ_IF_MAYBE
(
sink
,
s
->
sink
)
{
sink
->
reject
(
KJ_EXCEPTION
(
FAILED
,
"Exception in tee loop"
,
exception
));
}
}
}
});
}
constexpr
static
size_t
MAX_BLOCK_SIZE
=
1
<<
14
;
// 16k
Own
<
AsyncInputStream
>
inner
;
const
uint64_t
bufferSizeLimit
=
kj
::
maxValue
;
Maybe
<
uint64_t
>
length
;
Maybe
<
Branch
>
branches
[
2
];
Maybe
<
Stoppage
>
stoppage
;
Promise
<
void
>
pullPromise
=
READY_NOW
;
bool
pulling
=
false
;
};
constexpr
size_t
AsyncTee
::
MAX_BLOCK_SIZE
;
uint64_t
AsyncTee
::
Buffer
::
consume
(
ArrayPtr
<
byte
>&
readBuffer
,
size_t
&
minBytes
)
{
uint64_t
totalAmount
=
0
;
while
(
readBuffer
.
size
()
>
0
&&
!
bufferList
.
empty
())
{
auto
&
bytes
=
bufferList
.
front
();
auto
amount
=
kj
::
min
(
bytes
.
size
(),
readBuffer
.
size
());
memcpy
(
readBuffer
.
begin
(),
bytes
.
begin
(),
amount
);
totalAmount
+=
amount
;
readBuffer
=
readBuffer
.
slice
(
amount
,
readBuffer
.
size
());
minBytes
-=
kj
::
min
(
amount
,
minBytes
);
if
(
amount
==
bytes
.
size
())
{
bufferList
.
pop_front
();
}
else
{
bytes
=
heapArray
(
bytes
.
slice
(
amount
,
bytes
.
size
()));
return
totalAmount
;
}
}
return
totalAmount
;
}
void
AsyncTee
::
Buffer
::
produce
(
Array
<
byte
>
bytes
)
{
bufferList
.
push_back
(
mv
(
bytes
));
}
Array
<
const
ArrayPtr
<
const
byte
>>
AsyncTee
::
Buffer
::
asArray
(
uint64_t
maxBytes
,
uint64_t
&
amount
)
{
amount
=
0
;
Vector
<
ArrayPtr
<
const
byte
>>
buffers
;
Vector
<
Array
<
byte
>>
ownBuffers
;
while
(
maxBytes
>
0
&&
!
bufferList
.
empty
())
{
auto
&
bytes
=
bufferList
.
front
();
if
(
bytes
.
size
()
<=
maxBytes
)
{
amount
+=
bytes
.
size
();
maxBytes
-=
bytes
.
size
();
buffers
.
add
(
bytes
);
ownBuffers
.
add
(
mv
(
bytes
));
bufferList
.
pop_front
();
}
else
{
auto
ownBytes
=
heapArray
(
bytes
.
slice
(
0
,
maxBytes
));
buffers
.
add
(
ownBytes
);
ownBuffers
.
add
(
mv
(
ownBytes
));
bytes
=
heapArray
(
bytes
.
slice
(
maxBytes
,
bytes
.
size
()));
amount
+=
maxBytes
;
maxBytes
=
0
;
}
}
if
(
buffers
.
size
()
>
0
)
{
return
buffers
.
releaseAsArray
().
attach
(
mv
(
ownBuffers
));
}
return
{};
}
bool
AsyncTee
::
Buffer
::
empty
()
const
{
return
bufferList
.
empty
();
}
uint64_t
AsyncTee
::
Buffer
::
size
()
const
{
uint64_t
result
=
0
;
for
(
auto
&
bytes
:
bufferList
)
{
result
+=
bytes
.
size
();
}
return
result
;
}
class
TeeBranch
final
:
public
AsyncInputStream
{
public
:
TeeBranch
(
Own
<
AsyncTee
>
tee
,
uint8_t
branch
)
:
tee
(
mv
(
tee
)),
branch
(
branch
)
{
this
->
tee
->
addBranch
(
branch
);
}
~
TeeBranch
()
noexcept
(
false
)
{
unwind
.
catchExceptionsIfUnwinding
([
&
]()
{
tee
->
removeBranch
(
branch
);
});
}
Promise
<
size_t
>
tryRead
(
void
*
buffer
,
size_t
minBytes
,
size_t
maxBytes
)
override
{
return
tee
->
tryRead
(
branch
,
buffer
,
minBytes
,
maxBytes
);
}
Promise
<
uint64_t
>
pumpTo
(
AsyncOutputStream
&
output
,
uint64_t
amount
)
override
{
return
tee
->
pumpTo
(
branch
,
output
,
amount
);
}
Maybe
<
uint64_t
>
tryGetLength
()
override
{
return
tee
->
tryGetLength
(
branch
);
}
private
:
Own
<
AsyncTee
>
tee
;
const
uint8_t
branch
;
UnwindDetector
unwind
;
};
}
// namespace
Tee
newTee
(
Own
<
AsyncInputStream
>
input
,
uint64_t
limit
)
{
auto
impl
=
refcounted
<
AsyncTee
>
(
mv
(
input
),
limit
);
Own
<
AsyncInputStream
>
branch1
=
heap
<
TeeBranch
>
(
addRef
(
*
impl
),
0
);
Own
<
AsyncInputStream
>
branch2
=
heap
<
TeeBranch
>
(
mv
(
impl
),
1
);
return
{
{
mv
(
branch1
),
mv
(
branch2
)
}
};
}
Promise
<
Own
<
AsyncCapabilityStream
>>
AsyncCapabilityStream
::
receiveStream
()
{
return
tryReceiveStream
()
.
then
([](
Maybe
<
Own
<
AsyncCapabilityStream
>>&&
result
)
...
...
c++/src/kj/async-io.h
View file @
3c7748b7
...
...
@@ -203,6 +203,30 @@ struct CapabilityPipe {
Own
<
AsyncCapabilityStream
>
ends
[
2
];
};
struct
Tee
{
// Two AsyncInputStreams which each read the same data from some wrapped inner AsyncInputStream.
Own
<
AsyncInputStream
>
branches
[
2
];
};
Tee
newTee
(
Own
<
AsyncInputStream
>
input
,
uint64_t
limit
=
kj
::
maxValue
);
// Constructs a Tee that operates in-process. The tee buffers data if any read or pump operations is
// called on one of the two input ends. If a read or pump operation is subsequently called on the
// other input end, the buffered data is consumed.
//
// `pumpTo()` operations on the input ends will proactively read from the inner stream and block
// while writing to the output stream. While one branch has an active `pumpTo()` operation, any
// `tryRead()` operation on the other branch will not be allowed to read faster than allowed by the
// pump's backpressure. (In other words, it will never cause buffering on the pump.) Similarly, if
// there are `pumpTo()` operations active on both branches, the greater of the two backpressures is
// respected -- the two pumps progress in lockstep, and there is no buffering.
//
// At no point will a branch's buffer be allowed to grow beyond `limit` bytes. If the buffer would
// grow beyond the limit, an exception is generated, which both branches see once they have
// exhausted their buffers.
//
// It is recommended that you use a more conservative value for `limit` than the default.
class
ConnectionReceiver
{
// Represents a server socket listening on a port.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment