Commit eb7597fc authored by Kenton Varda's avatar Kenton Varda

Fix sendStream() failing if it can't complete immediately.

parent d8dd3233
......@@ -238,6 +238,47 @@ TEST(AsyncIo, CapabilityPipe) {
EXPECT_EQ("foo", result2);
}
TEST(AsyncIo, CapabilityPipeBlockedSendStream) {
// Check for a bug that existed at one point where if a sendStream() call couldn't complete
// immediately, it would fail.
auto io = setupAsyncIo();
auto pipe = io.provider->newCapabilityPipe();
Promise<void> promise = nullptr;
Own<AsyncIoStream> endpoint1;
uint nonBlockedCount = 0;
for (;;) {
auto pipe2 = io.provider->newCapabilityPipe();
promise = pipe.ends[0]->sendStream(kj::mv(pipe2.ends[0]));
if (promise.poll(io.waitScope)) {
// Send completed immediately, because there was enough space in the stream.
++nonBlockedCount;
promise.wait(io.waitScope);
} else {
// Send blocked! Let's continue with this promise then!
endpoint1 = kj::mv(pipe2.ends[1]);
break;
}
}
for (uint i KJ_UNUSED: kj::zeroTo(nonBlockedCount)) {
// Receive and ignore all the streams that were sent without blocking.
pipe.ends[1]->receiveStream().wait(io.waitScope);
}
// Now that write that blocked should have been able to complete.
promise.wait(io.waitScope);
// Now get the one that blocked.
auto endpoint2 = pipe.ends[1]->receiveStream().wait(io.waitScope);
endpoint1->write("foo", 3).wait(io.waitScope);
endpoint1->shutdownWrite();
KJ_EXPECT(endpoint2->readAllText().wait(io.waitScope) == "foo");
}
TEST(AsyncIo, CapabilityPipeMultiStreamMessage) {
auto ioContext = setupAsyncIo();
......
......@@ -216,7 +216,7 @@ public:
return downcast<AsyncStreamFd>(*stream).fd;
};
auto promise = writeInternal(data, moreData, fds);
return promise.attach(kj::mv(fds));
return promise.attach(kj::mv(fds), kj::mv(streams));
}
Promise<void> whenWriteDisconnected() override {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment