// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #if _WIN32 // Request Vista-level APIs. #define WINVER 0x0600 #define _WIN32_WINNT 0x0600 #endif #include "async-io.h" #include "async-io-internal.h" #include "debug.h" #include <kj/compat/gtest.h> #include <sys/types.h> #if _WIN32 #include <ws2tcpip.h> #include "windows-sanity.h" #define inet_pton InetPtonA #define inet_ntop InetNtopA #else #include <netdb.h> #include <unistd.h> #include <fcntl.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #endif namespace kj { namespace { TEST(AsyncIo, SimpleNetwork) { auto ioContext = setupAsyncIo(); auto& network = ioContext.provider->getNetwork(); Own<ConnectionReceiver> listener; Own<AsyncIoStream> server; Own<AsyncIoStream> client; char receiveBuffer[4]; auto port = newPromiseAndFulfiller<uint>(); port.promise.then([&](uint portnum) { return network.parseAddress("localhost", portnum); }).then([&](Own<NetworkAddress>&& result) { return result->connect(); }).then([&](Own<AsyncIoStream>&& result) { client = kj::mv(result); return client->write("foo", 3); }).detach([](kj::Exception&& exception) { KJ_FAIL_EXPECT(exception); }); kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { listener = result->listen(); port.fulfiller->fulfill(listener->getPort()); return listener->accept(); }).then([&](Own<AsyncIoStream>&& result) { server = kj::mv(result); return server->tryRead(receiveBuffer, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer, n); }).wait(ioContext.waitScope); EXPECT_EQ("foo", result); } String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) { return network.parseAddress(text, portHint).wait(waitScope)->toString(); } bool systemSupportsAddress(StringPtr addr, StringPtr service = nullptr) { // Can getaddrinfo() parse this addresses? This is only true if the address family (e.g., ipv6) // is configured on at least one interface. (The loopback interface usually has both ipv4 and // ipv6 configured, but not always.) struct addrinfo* list; int status = getaddrinfo( addr.cStr(), service == nullptr ? nullptr : service.cStr(), nullptr, &list); if (status == 0) { freeaddrinfo(list); return true; } else { return false; } } TEST(AsyncIo, AddressParsing) { auto ioContext = setupAsyncIo(); auto& w = ioContext.waitScope; auto& network = ioContext.provider->getNetwork(); EXPECT_EQ("*:0", tryParse(w, network, "*")); EXPECT_EQ("*:123", tryParse(w, network, "*:123")); EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0")); EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678)); #if !_WIN32 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz")); EXPECT_EQ("unix-abstract:foo/bar/baz", tryParse(w, network, "unix-abstract:foo/bar/baz")); #endif // We can parse services by name... // // For some reason, Android and some various Linux distros do not support service names. if (systemSupportsAddress("1.2.3.4", "http")) { EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678)); EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678)); } else { KJ_LOG(WARNING, "system does not support resolving service names on ipv4; skipping tests"); } // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any // interfaces. if (systemSupportsAddress("::")) { EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123)); EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432)); if (systemSupportsAddress("12ab:cd::34", "http")) { EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678)); EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678)); } else { KJ_LOG(WARNING, "system does not support resolving service names on ipv6; skipping tests"); } } else { KJ_LOG(WARNING, "system does not support ipv6; skipping tests"); } // It would be nice to test DNS lookup here but the test would not be very hermetic. Even // localhost can map to different addresses depending on whether IPv6 is enabled. We do // connect to "localhost" in a different test, though. } TEST(AsyncIo, OneWayPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newOneWayPipe(); char receiveBuffer[4]; pipe.out->write("foo", 3).detach([](kj::Exception&& exception) { KJ_FAIL_EXPECT(exception); }); kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer, n); }).wait(ioContext.waitScope); EXPECT_EQ("foo", result); } TEST(AsyncIo, TwoWayPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newTwoWayPipe(); char receiveBuffer1[4]; char receiveBuffer2[4]; auto promise = pipe.ends[0]->write("foo", 3).then([&]() { return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer1, n); }); kj::String result = pipe.ends[1]->write("bar", 3).then([&]() { return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer2, n); }).wait(ioContext.waitScope); kj::String result2 = promise.wait(ioContext.waitScope); EXPECT_EQ("foo", result); EXPECT_EQ("bar", result2); } #if !_WIN32 && !__CYGWIN__ TEST(AsyncIo, CapabilityPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newCapabilityPipe(); auto pipe2 = ioContext.provider->newCapabilityPipe(); char receiveBuffer1[4]; char receiveBuffer2[4]; // Expect to receive a stream, then write "bar" to it, then receive "foo" from it. Own<AsyncCapabilityStream> receivedStream; auto promise = pipe2.ends[1]->receiveStream() .then([&](Own<AsyncCapabilityStream> stream) { receivedStream = kj::mv(stream); return receivedStream->write("bar", 3); }).then([&]() { return receivedStream->tryRead(receiveBuffer2, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer2, n); }); // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar" // from it. kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1])) .then([&]() { return pipe.ends[0]->write("foo", 3); }).then([&]() { return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer1, n); }).wait(ioContext.waitScope); kj::String result2 = promise.wait(ioContext.waitScope); EXPECT_EQ("bar", result); EXPECT_EQ("foo", result2); } #endif TEST(AsyncIo, PipeThread) { auto ioContext = setupAsyncIo(); auto pipeThread = ioContext.provider->newPipeThread( [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { char buf[4]; stream.write("foo", 3).wait(waitScope); EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); EXPECT_EQ("bar", heapString(buf, 3)); // Expect disconnect. EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope)); }); char buf[4]; pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); EXPECT_EQ("foo", heapString(buf, 3)); } TEST(AsyncIo, PipeThreadDisconnects) { // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting. auto ioContext = setupAsyncIo(); auto pipeThread = ioContext.provider->newPipeThread( [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { char buf[4]; stream.write("foo", 3).wait(waitScope); EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); EXPECT_EQ("bar", heapString(buf, 3)); }); char buf[4]; EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); EXPECT_EQ("foo", heapString(buf, 3)); pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); // Expect disconnect. EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope)); } TEST(AsyncIo, Timeouts) { auto ioContext = setupAsyncIo(); Timer& timer = ioContext.provider->getTimer(); auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE)); auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123)); EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; }) .wait(ioContext.waitScope)); EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); } #if !_WIN32 // datagrams not implemented on win32 yet bool isMsgTruncBroken() { // Detect if the kernel fails to set MSG_TRUNC on recvmsg(). This seems to be the case at least // when running an arm64 binary under qemu. int fd; KJ_SYSCALL(fd = socket(AF_INET, SOCK_DGRAM, 0)); KJ_DEFER(close(fd)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(0x7f000001); KJ_SYSCALL(bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); // Read back the assigned port. socklen_t len = sizeof(addr); KJ_SYSCALL(getsockname(fd, reinterpret_cast<struct sockaddr*>(&addr), &len)); KJ_ASSERT(len == sizeof(addr)); const char* message = "foobar"; KJ_SYSCALL(sendto(fd, message, strlen(message), 0, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); char buf[4]; struct iovec iov; iov.iov_base = buf; iov.iov_len = 3; struct msghdr msg; memset(&msg, 0, sizeof(msg)); msg.msg_iov = &iov; msg.msg_iovlen = 1; ssize_t n; KJ_SYSCALL(n = recvmsg(fd, &msg, 0)); KJ_ASSERT(n == 3); buf[3] = 0; KJ_ASSERT(kj::StringPtr(buf) == "foo"); return (msg.msg_flags & MSG_TRUNC) == 0; } TEST(AsyncIo, Udp) { bool msgTruncBroken = isMsgTruncBroken(); auto ioContext = setupAsyncIo(); auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope); auto port1 = addr->bindDatagramPort(); auto port2 = addr->bindDatagramPort(); auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort()) .wait(ioContext.waitScope); auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort()) .wait(ioContext.waitScope); Own<NetworkAddress> receivedAddr; { // Send a message and receive it. EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope)); auto receiver = port2->makeReceiver(); receiver->receive().wait(ioContext.waitScope); { auto content = receiver->getContent(); EXPECT_EQ("foo", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } receivedAddr = receiver->getSource().clone(); EXPECT_EQ(addr1->toString(), receivedAddr->toString()); { auto ancillary = receiver->getAncillary(); EXPECT_EQ(0, ancillary.value.size()); EXPECT_FALSE(ancillary.isTruncated); } // Receive a second message with the same receiver. { auto promise = receiver->receive(); // This time, start receiving before sending EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope)); promise.wait(ioContext.waitScope); auto content = receiver->getContent(); EXPECT_EQ("barbaz", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } } DatagramReceiver::Capacity capacity; capacity.content = 8; capacity.ancillary = 1024; { // Send a reply that will be truncated. EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope)); auto recv1 = port1->makeReceiver(capacity); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("01234567", kj::heapString(content.value.asChars())); EXPECT_TRUE(content.isTruncated || msgTruncBroken); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_EQ(0, ancillary.value.size()); EXPECT_FALSE(ancillary.isTruncated); } #if defined(IP_PKTINFO) && !__CYGWIN__ && !__aarch64__ // Set IP_PKTINFO header and try to receive it. // // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7. // // Doesn't work when running arm64 binaries under QEMU -- in fact, it crashes QEMU. We don't // have a good way to test if we're under QEMU so we just skip this test on aarch64. int one = 1; port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one)); EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("foo", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_FALSE(ancillary.isTruncated); ASSERT_EQ(1, ancillary.value.size()); auto message = ancillary.value[0]; EXPECT_EQ(IPPROTO_IP, message.getLevel()); EXPECT_EQ(IP_PKTINFO, message.getType()); EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size()); auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>()); EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1 } // See what happens if there's not quite enough space for in_pktinfo. capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8; recv1 = port1->makeReceiver(capacity); EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("bar", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_TRUE(ancillary.isTruncated || msgTruncBroken); // We might get a message, but it will be truncated. if (ancillary.value.size() != 0) { EXPECT_EQ(1, ancillary.value.size()); auto message = ancillary.value[0]; EXPECT_EQ(IPPROTO_IP, message.getLevel()); EXPECT_EQ(IP_PKTINFO, message.getType()); EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr); EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo)); } } // See what happens if there's not enough space even for the cmsghdr. capacity.ancillary = CMSG_SPACE(0) - 8; recv1 = port1->makeReceiver(capacity); EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("baz", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_TRUE(ancillary.isTruncated); EXPECT_EQ(0, ancillary.value.size()); } #endif } } #endif // !_WIN32 #ifdef __linux__ // Abstract unix sockets are only supported on Linux TEST(AsyncIo, AbstractUnixSocket) { auto ioContext = setupAsyncIo(); auto& network = ioContext.provider->getNetwork(); Own<NetworkAddress> addr = network.parseAddress("unix-abstract:foo").wait(ioContext.waitScope); Own<ConnectionReceiver> listener = addr->listen(); // chdir proves no filesystem dependence. Test fails for regular unix socket // but passes for abstract unix socket. int originalDirFd; KJ_SYSCALL(originalDirFd = open(".", O_RDONLY | O_DIRECTORY | O_CLOEXEC)); KJ_DEFER(close(originalDirFd)); KJ_SYSCALL(chdir("/")); KJ_DEFER(KJ_SYSCALL(fchdir(originalDirFd))); addr->connect().attach(kj::mv(listener)).wait(ioContext.waitScope); } #endif // __linux__ KJ_TEST("CIDR parsing") { KJ_EXPECT(_::CidrRange("1.2.3.4/16").toString() == "1.2.0.0/16"); KJ_EXPECT(_::CidrRange("1.2.255.4/18").toString() == "1.2.192.0/18"); KJ_EXPECT(_::CidrRange("1234::abcd:ffff:ffff/98").toString() == "1234::abcd:c000:0/98"); KJ_EXPECT(_::CidrRange::inet4({1,2,255,4}, 18).toString() == "1.2.192.0/18"); KJ_EXPECT(_::CidrRange::inet6({0x1234, 0x5678}, {0xabcd, 0xffff, 0xffff}, 98).toString() == "1234:5678::abcd:c000:0/98"); union { struct sockaddr addr; struct sockaddr_in addr4; struct sockaddr_in6 addr6; }; memset(&addr6, 0, sizeof(addr6)); { addr4.sin_family = AF_INET; addr4.sin_addr.s_addr = htonl(0x0102dfff); KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); KJ_EXPECT(!_::CidrRange("::/0").matches(&addr)); } { addr4.sin_family = AF_INET6; byte bytes[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; memcpy(addr6.sin6_addr.s6_addr, bytes, 16); KJ_EXPECT(_::CidrRange("0102:03ff::/24").matches(&addr)); KJ_EXPECT(!_::CidrRange("0102:02ff::/24").matches(&addr)); KJ_EXPECT(_::CidrRange("0102:02ff::/23").matches(&addr)); KJ_EXPECT(_::CidrRange("0102:0304:0506:0708:090a:0b0c:0d0e:0f10/128").matches(&addr)); KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); KJ_EXPECT(!_::CidrRange("0.0.0.0/0").matches(&addr)); } { addr4.sin_family = AF_INET6; inet_pton(AF_INET6, "::ffff:1.2.223.255", &addr6.sin6_addr); KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); } } bool allowed4(_::NetworkFilter& filter, StringPtr addrStr) { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; inet_pton(AF_INET, addrStr.cStr(), &addr.sin_addr); return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); } bool allowed6(_::NetworkFilter& filter, StringPtr addrStr) { struct sockaddr_in6 addr; memset(&addr, 0, sizeof(addr)); addr.sin6_family = AF_INET6; inet_pton(AF_INET6, addrStr.cStr(), &addr.sin6_addr); return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); } KJ_TEST("NetworkFilter") { _::NetworkFilter base; KJ_EXPECT(allowed4(base, "8.8.8.8")); KJ_EXPECT(!allowed4(base, "240.1.2.3")); { _::NetworkFilter filter({"public"}, {}, base); KJ_EXPECT(allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(!allowed4(filter, "192.168.0.1")); KJ_EXPECT(!allowed4(filter, "10.1.2.3")); KJ_EXPECT(!allowed4(filter, "127.0.0.1")); KJ_EXPECT(!allowed4(filter, "0.0.0.0")); KJ_EXPECT(allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); KJ_EXPECT(!allowed6(filter, "fc00::1234")); KJ_EXPECT(!allowed6(filter, "::1")); KJ_EXPECT(!allowed6(filter, "::")); } { _::NetworkFilter filter({"private"}, {"local"}, base); KJ_EXPECT(!allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(allowed4(filter, "192.168.0.1")); KJ_EXPECT(allowed4(filter, "10.1.2.3")); KJ_EXPECT(!allowed4(filter, "127.0.0.1")); KJ_EXPECT(!allowed4(filter, "0.0.0.0")); KJ_EXPECT(!allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); KJ_EXPECT(allowed6(filter, "fc00::1234")); KJ_EXPECT(!allowed6(filter, "::1")); KJ_EXPECT(!allowed6(filter, "::")); } { _::NetworkFilter filter({"1.0.0.0/8", "1.2.3.0/24"}, {"1.2.0.0/16", "1.2.3.4/32"}, base); KJ_EXPECT(!allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(allowed4(filter, "1.0.0.1")); KJ_EXPECT(!allowed4(filter, "1.2.2.1")); KJ_EXPECT(allowed4(filter, "1.2.3.1")); KJ_EXPECT(!allowed4(filter, "1.2.3.4")); } } KJ_TEST("Network::restrictPeers()") { auto ioContext = setupAsyncIo(); auto& w = ioContext.waitScope; auto& network = ioContext.provider->getNetwork(); auto restrictedNetwork = network.restrictPeers({"public"}); KJ_EXPECT(tryParse(w, *restrictedNetwork, "8.8.8.8") == "8.8.8.8:0"); #if !_WIN32 KJ_EXPECT_THROW_MESSAGE("restrictPeers", tryParse(w, *restrictedNetwork, "unix:/foo")); #endif auto addr = restrictedNetwork->parseAddress("127.0.0.1").wait(w); auto listener = addr->listen(); auto acceptTask = listener->accept() .then([](kj::Own<kj::AsyncIoStream>) { KJ_FAIL_EXPECT("should not have received connection"); }).eagerlyEvaluate(nullptr); KJ_EXPECT_THROW_MESSAGE("restrictPeers", addr->connect().wait(w)); // We can connect to the listener but the connection will be immediately closed. auto addr2 = network.parseAddress("127.0.0.1", listener->getPort()).wait(w); auto conn = addr2->connect().wait(w); KJ_EXPECT(conn->readAllText().wait(w) == ""); } kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { if (expected.size() == 0) return kj::READY_NOW; auto buffer = kj::heapArray<char>(expected.size()); auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { if (amount == 0) { KJ_FAIL_ASSERT("expected data never sent", expected); } auto actual = buffer.slice(0, amount); if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); } return expectRead(in, expected.slice(amount)); })); } class MockAsyncInputStream final: public AsyncInputStream { public: MockAsyncInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize) : bytes(bytes), blockSize(blockSize) {} kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { // Clamp max read to blockSize. size_t n = kj::min(blockSize, maxBytes); // Unless that's less than minBytes -- in which case, use minBytes. n = kj::max(n, minBytes); // But also don't read more data than we have. n = kj::min(n, bytes.size()); memcpy(buffer, bytes.begin(), n); bytes = bytes.slice(n, bytes.size()); return n; } private: kj::ArrayPtr<const byte> bytes; size_t blockSize; }; KJ_TEST("AsyncInputStream::readAllText() / readAllBytes()") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); size_t inputSizes[] = { 0, 1, 256, 4096, 8191, 8192, 8193, 10000, bigText.size() }; size_t blockSizes[] = { 1, 4, 256, 4096, 8192, bigText.size() }; uint64_t limits[] = { 0, 1, 256, bigText.size() / 2, bigText.size() - 1, bigText.size(), bigText.size() + 1, kj::maxValue }; for (size_t inputSize: inputSizes) { for (size_t blockSize: blockSizes) { for (uint64_t limit: limits) { KJ_CONTEXT(inputSize, blockSize, limit); auto textSlice = bigText.asBytes().slice(0, inputSize); auto readAllText = [&]() { MockAsyncInputStream input(textSlice, blockSize); return input.readAllText(limit).wait(ws); }; auto readAllBytes = [&]() { MockAsyncInputStream input(textSlice, blockSize); return input.readAllBytes(limit).wait(ws); }; if (limit > inputSize) { KJ_EXPECT(readAllText().asBytes() == textSlice); KJ_EXPECT(readAllBytes() == textSlice); } else { KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllText()); KJ_EXPECT_THROW_MESSAGE("Reached limit before EOF.", readAllBytes()); } } } } } KJ_TEST("Userland pipe") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); char buf[4]; KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3); buf[3] = '\0'; KJ_EXPECT(buf == "foo"_kj); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe cancel write") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto promise = pipe.out->write("foobar", 6); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); KJ_EXPECT(!promise.poll(ws)); promise = nullptr; promise = pipe.out->write("baz", 3); expectRead(*pipe.in, "baz").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); } KJ_TEST("Userland pipe cancel read") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto writeOp = pipe.out->write("foo", 3); auto readOp = expectRead(*pipe.in, "foobar"); writeOp.wait(ws); KJ_EXPECT(!readOp.poll(ws)); readOp = nullptr; auto writeOp2 = pipe.out->write("baz", 3); expectRead(*pipe.in, "baz").wait(ws); } KJ_TEST("Userland pipe pumpTo") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); auto promise2 = pipe2.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe tryPumpFrom") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); auto promise2 = pipe2.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(!promise2.poll(ws)); KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe pumpTo cancel") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); auto promise = pipe.out->write("foobar", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); // Cancel pump. pumpPromise = nullptr; auto promise3 = pipe2.out->write("baz", 3); expectRead(*pipe2.in, "baz").wait(ws); } KJ_TEST("Userland pipe tryPumpFrom cancel") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foobar", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); // Cancel pump. pumpPromise = nullptr; auto promise3 = pipe2.out->write("baz", 3); expectRead(*pipe2.in, "baz").wait(ws); } KJ_TEST("Userland pipe with limit") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(6); { auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); promise.wait(ws); } { auto promise = pipe.in->readAllText(); KJ_EXPECT(!promise.poll(ws)); auto promise2 = pipe.out->write("barbaz", 6); KJ_EXPECT(promise.wait(ws) == "bar"); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); } // Further writes throw and reads return EOF. KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); } KJ_TEST("Userland pipe pumpTo with limit") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(6); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); { auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); } { auto promise = expectRead(*pipe2.in, "bar"); KJ_EXPECT(!promise.poll(ws)); auto promise2 = pipe.out->write("barbaz", 6); promise.wait(ws); pumpPromise.wait(ws); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); } // Further writes throw. KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); } KJ_TEST("Userland pipe pump into zero-limited pipe, no data to pump") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(uint64_t(0)); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); expectRead(*pipe2.in, ""); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 0); } KJ_TEST("Userland pipe pump into zero-limited pipe, data is pumped") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(uint64_t(0)); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); expectRead(*pipe2.in, ""); auto writePromise = pipe.out->write("foo", 3); KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pumpPromise.wait(ws)); } KJ_TEST("Userland pipe gather write") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foobar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); expectRead(*pipe.in, "bar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "fo").wait(ws); expectRead(*pipe.in, "obar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foob").wait(ws); expectRead(*pipe.in, "ar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write pump") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); expectRead(*pipe2.in, "bar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "fo").wait(ws); expectRead(*pipe2.in, "obar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foob").wait(ws); expectRead(*pipe2.in, "ar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write split pump on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3) .then([&](uint64_t i) { KJ_EXPECT(i == 3); return pipe.in->pumpTo(*pipe2.out, 3); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe gather write split pump mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2) .then([&](uint64_t i) { KJ_EXPECT(i == 2); return pipe.in->pumpTo(*pipe2.out, 4); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 4); } KJ_TEST("Userland pipe gather write split pump mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4) .then([&](uint64_t i) { KJ_EXPECT(i == 4); return pipe.in->pumpTo(*pipe2.out, 2); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 2); } KJ_TEST("Userland pipe gather write pumpFrom") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); expectRead(*pipe2.in, "bar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "fo").wait(ws); expectRead(*pipe2.in, "obar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foob").wait(ws); expectRead(*pipe2.in, "ar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write split pumpFrom on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)) .then([&](uint64_t i) { KJ_EXPECT(i == 3); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe gather write split pumpFrom mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)) .then([&](uint64_t i) { KJ_EXPECT(i == 2); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 4); } KJ_TEST("Userland pipe gather write split pumpFrom mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)) .then([&](uint64_t i) { KJ_EXPECT(i == 4); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 2); } KJ_TEST("Userland pipe pumpTo less than write amount") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); auto pieces = kj::heapArray<ArrayPtr<const byte>>(2); byte a[1] = { 'a' }; byte b[1] = { 'b' }; pieces[0] = arrayPtr(a, 1); pieces[1] = arrayPtr(b, 1); auto writePromise = pipe.out->write(pieces); KJ_EXPECT(!writePromise.poll(ws)); expectRead(*pipe2.in, "a").wait(ws); KJ_EXPECT(pumpPromise.wait(ws) == 1); KJ_EXPECT(!writePromise.poll(ws)); pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); expectRead(*pipe2.in, "b").wait(ws); KJ_EXPECT(pumpPromise.wait(ws) == 1); writePromise.wait(ws); } KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foobar", 6); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); KJ_EXPECT(!pumpPromise.poll(ws)); pipe.out = nullptr; pipe2.in = nullptr; // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; } KJ_TEST("Userland pipe EOF fulfills pumpFrom promise") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto writePromise = pipe.out->write("foobar", 6); KJ_EXPECT(!writePromise.poll(ws)); auto pipe3 = newOneWayPipe(); auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out); KJ_EXPECT(!pumpPromise2.poll(ws)); expectRead(*pipe3.in, "foobar").wait(ws); writePromise.wait(ws); KJ_EXPECT(!pumpPromise.poll(ws)); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); KJ_EXPECT(!pumpPromise2.poll(ws)); pipe2.out = nullptr; KJ_EXPECT(pumpPromise2.wait(ws) == 6); } KJ_TEST("Userland pipe tryPumpFrom to pumpTo for same amount fulfills simultaneously") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 6)); auto writePromise = pipe.out->write("foobar", 6); KJ_EXPECT(!writePromise.poll(ws)); auto pipe3 = newOneWayPipe(); auto pumpPromise2 = pipe2.in->pumpTo(*pipe3.out, 6); KJ_EXPECT(!pumpPromise2.poll(ws)); expectRead(*pipe3.in, "foobar").wait(ws); writePromise.wait(ws); KJ_EXPECT(pumpPromise.wait(ws) == 6); KJ_EXPECT(pumpPromise2.wait(ws) == 6); } constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14; // AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing KJ_TEST("Userland tee") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto tee = newTee(kj::mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto writePromise = pipe.out->write("foobar", 6); expectRead(*left, "foobar").wait(ws); writePromise.wait(ws); expectRead(*right, "foobar").wait(ws); } KJ_TEST("Userland tee concurrent read") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto tee = newTee(kj::mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); uint8_t leftBuf[6] = { 0 }; uint8_t rightBuf[6] = { 0 }; auto leftPromise = left->tryRead(leftBuf, 6, 6); auto rightPromise = right->tryRead(rightBuf, 6, 6); KJ_EXPECT(!leftPromise.poll(ws)); KJ_EXPECT(!rightPromise.poll(ws)); pipe.out->write("foobar", 6).wait(ws); KJ_EXPECT(leftPromise.wait(ws) == 6); KJ_EXPECT(rightPromise.wait(ws) == 6); KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); } KJ_TEST("Userland tee cancel and restart read") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto tee = newTee(kj::mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto writePromise = pipe.out->write("foobar", 6); { // Initiate a read and immediately cancel it. uint8_t buf[6] = { 0 }; auto promise = left->tryRead(buf, 6, 6); } // Subsequent reads still see the full data. expectRead(*left, "foobar").wait(ws); writePromise.wait(ws); expectRead(*right, "foobar").wait(ws); } KJ_TEST("Userland tee cancel read and destroy branch then read other branch") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto tee = newTee(kj::mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto writePromise = pipe.out->write("foobar", 6); { // Initiate a read and immediately cancel it. uint8_t buf[6] = { 0 }; auto promise = left->tryRead(buf, 6, 6); } // And destroy the branch for good measure. left = nullptr; // Subsequent reads on the other branch still see the full data. expectRead(*right, "foobar").wait(ws); writePromise.wait(ws); } KJ_TEST("Userland tee subsequent other-branch reads are READY_NOW") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto tee = newTee(kj::mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); uint8_t leftBuf[6] = { 0 }; auto leftPromise = left->tryRead(leftBuf, 6, 6); // This is the first read, so there should NOT be buffered data. KJ_EXPECT(!leftPromise.poll(ws)); pipe.out->write("foobar", 6).wait(ws); leftPromise.wait(ws); KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); uint8_t rightBuf[6] = { 0 }; auto rightPromise = right->tryRead(rightBuf, 6, 6); // The left read promise was fulfilled, so there SHOULD be buffered data. KJ_EXPECT(rightPromise.poll(ws)); rightPromise.wait(ws); KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); } KJ_TEST("Userland tee read EOF propagation") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto writePromise = pipe.out->write("foobar", 6); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); // Lengthless pipe, so ... KJ_EXPECT(left->tryGetLength() == nullptr); KJ_EXPECT(right->tryGetLength() == nullptr); uint8_t leftBuf[7] = { 0 }; auto leftPromise = left->tryRead(leftBuf, size(leftBuf), size(leftBuf)); writePromise.wait(ws); // Destroying the output side should force a short read. pipe.out = nullptr; KJ_EXPECT(leftPromise.wait(ws) == 6); KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); // And we should see a short read here, too. uint8_t rightBuf[7] = { 0 }; auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf)); KJ_EXPECT(rightPromise.wait(ws) == 6); KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); // Further reads should all be short. KJ_EXPECT(left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws) == 0); KJ_EXPECT(right->tryRead(rightBuf, 1, size(rightBuf)).wait(ws) == 0); } KJ_TEST("Userland tee read exception propagation") { kj::EventLoop loop; WaitScope ws(loop); // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe // ended prematurely" exception when we destroy the output side early. auto pipe = newOneWayPipe(7); auto writePromise = pipe.out->write("foobar", 6); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); // Test tryGetLength() while we're at it. KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 7); KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7); uint8_t leftBuf[7] = { 0 }; auto leftPromise = left->tryRead(leftBuf, 6, size(leftBuf)); writePromise.wait(ws); // Destroying the output side should force a fulfillment of the read (since we reached minBytes). pipe.out = nullptr; KJ_EXPECT(leftPromise.wait(ws) == 6); KJ_EXPECT(memcmp(leftBuf, "foobar", 6) == 0); // The next read sees the exception. KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws)); // Test tryGetLength() here -- the unread branch still sees the original length value. KJ_EXPECT(KJ_ASSERT_NONNULL(left->tryGetLength()) == 1); KJ_EXPECT(KJ_ASSERT_NONNULL(right->tryGetLength()) == 7); // We should see the buffered data on the other side, even though we don't reach our minBytes. uint8_t rightBuf[7] = { 0 }; auto rightPromise = right->tryRead(rightBuf, size(rightBuf), size(rightBuf)); KJ_EXPECT(rightPromise.wait(ws) == 6); KJ_EXPECT(memcmp(rightBuf, "foobar", 6) == 0); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", right->tryRead(rightBuf, 1, size(leftBuf)).wait(ws)); // Further reads should all see the exception again. KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", left->tryRead(leftBuf, 1, size(leftBuf)).wait(ws)); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", right->tryRead(rightBuf, 1, size(leftBuf)).wait(ws)); } KJ_TEST("Userland tee read exception propagation w/ data loss") { kj::EventLoop loop; WaitScope ws(loop); // Make a pipe expecting to read more than we're actually going to write. This will force a "pipe // ended prematurely" exception once the pipe sees a short read. auto pipe = newOneWayPipe(7); auto writePromise = pipe.out->write("foobar", 6); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); uint8_t leftBuf[7] = { 0 }; auto leftPromise = left->tryRead(leftBuf, 7, 7); writePromise.wait(ws); // Destroying the output side should force an exception, since we didn't reach our minBytes. pipe.out = nullptr; KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPromise.wait(ws)); // And we should see a short read here, too. In fact, we shouldn't see anything: the short read // above read all of the pipe's data, but then failed to buffer it because it encountered an // exception. It buffered the exception, instead. uint8_t rightBuf[7] = { 0 }; KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", right->tryRead(rightBuf, 1, 1).wait(ws)); } KJ_TEST("Userland tee read into different buffer sizes") { kj::EventLoop loop; WaitScope ws(loop); auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); uint8_t leftBuf[5] = { 0 }; uint8_t rightBuf[11] = { 0 }; auto leftPromise = left->tryRead(leftBuf, 5, 5); auto rightPromise = right->tryRead(rightBuf, 11, 11); KJ_EXPECT(leftPromise.wait(ws) == 5); KJ_EXPECT(rightPromise.wait(ws) == 11); } KJ_TEST("Userland tee reads see max(minBytes...) and min(maxBytes...)") { kj::EventLoop loop; WaitScope ws(loop); auto tee = newTee(heap<MockAsyncInputStream>("foo bar baz"_kj.asBytes(), 11)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); { uint8_t leftBuf[5] = { 0 }; uint8_t rightBuf[11] = { 0 }; // Subrange of another range. The smaller maxBytes should win. auto leftPromise = left->tryRead(leftBuf, 3, 5); auto rightPromise = right->tryRead(rightBuf, 1, 11); KJ_EXPECT(leftPromise.wait(ws) == 5); KJ_EXPECT(rightPromise.wait(ws) == 5); } { uint8_t leftBuf[5] = { 0 }; uint8_t rightBuf[11] = { 0 }; // Disjoint ranges. The larger minBytes should win. auto leftPromise = left->tryRead(leftBuf, 3, 5); auto rightPromise = right->tryRead(rightBuf, 6, 11); KJ_EXPECT(leftPromise.wait(ws) == 5); KJ_EXPECT(rightPromise.wait(ws) == 6); KJ_EXPECT(left->tryRead(leftBuf, 1, 2).wait(ws) == 1); } } KJ_TEST("Userland tee read stress test") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftBuffer = heapArray<byte>(bigText.size()); { auto leftSlice = leftBuffer.slice(0, leftBuffer.size()); while (leftSlice.size() > 0) { for (size_t blockSize: { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59 }) { if (leftSlice.size() == 0) break; auto maxBytes = min(blockSize, leftSlice.size()); auto amount = left->tryRead(leftSlice.begin(), 1, maxBytes).wait(ws); leftSlice = leftSlice.slice(amount, leftSlice.size()); } } } KJ_EXPECT(memcmp(leftBuffer.begin(), bigText.begin(), leftBuffer.size()) == 0); KJ_EXPECT(right->readAllText().wait(ws) == bigText); } KJ_TEST("Userland tee pump") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out, 7); KJ_EXPECT(!leftPumpPromise.poll(ws)); auto rightPumpPromise = right->pumpTo(*rightPipe.out); // Neither are ready yet, because the left pump's backpressure has blocked the AsyncTee's pull // loop until we read from leftPipe. KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); expectRead(*leftPipe.in, "foo bar").wait(ws); KJ_EXPECT(leftPumpPromise.wait(ws) == 7); KJ_EXPECT(!rightPumpPromise.poll(ws)); // We should be able to read up to how far the left side pumped, and beyond. The left side will // now have data in its buffer. expectRead(*rightPipe.in, "foo bar baz,foo bar baz,foo").wait(ws); // Consume the left side buffer. expectRead(*left, " baz,foo bar").wait(ws); // We can destroy the left branch entirely and the right branch will still see all data. left = nullptr; KJ_EXPECT(!rightPumpPromise.poll(ws)); auto allTextPromise = rightPipe.in->readAllText(); KJ_EXPECT(rightPumpPromise.wait(ws) == bigText.size()); // Need to force an EOF in the right pipe to check the result. rightPipe.out = nullptr; KJ_EXPECT(allTextPromise.wait(ws) == bigText.slice(27)); } KJ_TEST("Userland tee pump slows down reads") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); // The left pump will cause some data to be buffered on the right branch, which we can read. auto rightExpectation0 = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE)); expectRead(*right, rightExpectation0).wait(ws); // But the next right branch read is blocked by the left pipe's backpressure. auto rightExpectation1 = kj::str(bigText.slice(TEE_MAX_CHUNK_SIZE, TEE_MAX_CHUNK_SIZE + 10)); auto rightPromise = expectRead(*right, rightExpectation1); KJ_EXPECT(!rightPromise.poll(ws)); // The right branch read finishes when we relieve the pressure in the left pipe. auto allTextPromise = leftPipe.in->readAllText(); rightPromise.wait(ws); KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size()); leftPipe.out = nullptr; KJ_EXPECT(allTextPromise.wait(ws) == bigText); } KJ_TEST("Userland tee pump EOF propagation") { kj::EventLoop loop; WaitScope ws(loop); { // EOF encountered by two pump operations. auto pipe = newOneWayPipe(); auto writePromise = pipe.out->write("foo bar", 7); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); // Pump the first bit, and block. auto leftPumpPromise = left->pumpTo(*leftPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); auto rightPumpPromise = right->pumpTo(*rightPipe.out); writePromise.wait(ws); KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); // Induce an EOF. We should see it propagated to both pump promises. pipe.out = nullptr; // Relieve backpressure. auto leftAllPromise = leftPipe.in->readAllText(); auto rightAllPromise = rightPipe.in->readAllText(); KJ_EXPECT(leftPumpPromise.wait(ws) == 7); KJ_EXPECT(rightPumpPromise.wait(ws) == 7); // Make sure we got the data on the pipes that were being pumped to. KJ_EXPECT(!leftAllPromise.poll(ws)); KJ_EXPECT(!rightAllPromise.poll(ws)); leftPipe.out = nullptr; rightPipe.out = nullptr; KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar"); } { // EOF encountered by a read and pump operation. auto pipe = newOneWayPipe(); auto writePromise = pipe.out->write("foo bar", 7); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); // Pump one branch, read another. auto leftPumpPromise = left->pumpTo(*leftPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); expectRead(*right, "foo bar").wait(ws); writePromise.wait(ws); uint8_t dummy = 0; auto rightReadPromise = right->tryRead(&dummy, 1, 1); // Induce an EOF. We should see it propagated to both the read and pump promises. pipe.out = nullptr; // Relieve backpressure in the tee to see the EOF. auto leftAllPromise = leftPipe.in->readAllText(); KJ_EXPECT(leftPumpPromise.wait(ws) == 7); KJ_EXPECT(rightReadPromise.wait(ws) == 0); // Make sure we got the data on the pipe that was being pumped to. KJ_EXPECT(!leftAllPromise.poll(ws)); leftPipe.out = nullptr; KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); } } KJ_TEST("Userland tee pump EOF on chunk boundary") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); // Conjure an EOF right on the boundary of the tee's internal chunk. auto chunkText = kj::str(bigText.slice(0, TEE_MAX_CHUNK_SIZE)); auto tee = newTee(heap<MockAsyncInputStream>(chunkText.asBytes(), chunkText.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out); auto rightPumpPromise = right->pumpTo(*rightPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); auto leftAllPromise = leftPipe.in->readAllText(); auto rightAllPromise = rightPipe.in->readAllText(); // The pumps should see the EOF and stop. KJ_EXPECT(leftPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE); KJ_EXPECT(rightPumpPromise.wait(ws) == TEE_MAX_CHUNK_SIZE); // Verify that we saw the data on the other end of the destination pipes. leftPipe.out = nullptr; rightPipe.out = nullptr; KJ_EXPECT(leftAllPromise.wait(ws) == chunkText); KJ_EXPECT(rightAllPromise.wait(ws) == chunkText); } KJ_TEST("Userland tee pump read exception propagation") { kj::EventLoop loop; WaitScope ws(loop); { // Exception encountered by two pump operations. auto pipe = newOneWayPipe(14); auto writePromise = pipe.out->write("foo bar", 7); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); // Pump the first bit, and block. auto leftPumpPromise = left->pumpTo(*leftPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); auto rightPumpPromise = right->pumpTo(*rightPipe.out); writePromise.wait(ws); KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); // Induce a read exception. We should see it propagated to both pump promises. pipe.out = nullptr; // Both promises must exist before the backpressure in the tee is relieved, and the tee pull // loop actually sees the exception. auto leftAllPromise = leftPipe.in->readAllText(); auto rightAllPromise = rightPipe.in->readAllText(); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPumpPromise.wait(ws)); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", rightPumpPromise.wait(ws)); // Make sure we got the data on the destination pipes. KJ_EXPECT(!leftAllPromise.poll(ws)); KJ_EXPECT(!rightAllPromise.poll(ws)); leftPipe.out = nullptr; rightPipe.out = nullptr; KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); KJ_EXPECT(rightAllPromise.wait(ws) == "foo bar"); } { // Exception encountered by a read and pump operation. auto pipe = newOneWayPipe(14); auto writePromise = pipe.out->write("foo bar", 7); auto tee = newTee(mv(pipe.in)); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); // Pump one branch, read another. auto leftPumpPromise = left->pumpTo(*leftPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); expectRead(*right, "foo bar").wait(ws); writePromise.wait(ws); uint8_t dummy = 0; auto rightReadPromise = right->tryRead(&dummy, 1, 1); // Induce a read exception. We should see it propagated to both the read and pump promises. pipe.out = nullptr; // Relieve backpressure in the tee to see the exceptions. auto leftAllPromise = leftPipe.in->readAllText(); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", leftPumpPromise.wait(ws)); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("pipe ended prematurely", rightReadPromise.wait(ws)); // Make sure we got the data on the destination pipe. KJ_EXPECT(!leftAllPromise.poll(ws)); leftPipe.out = nullptr; KJ_EXPECT(leftAllPromise.wait(ws) == "foo bar"); } } KJ_TEST("Userland tee pump write exception propagation") { kj::EventLoop loop; WaitScope ws(loop); auto bigText = strArray(kj::repeat("foo bar baz"_kj, 12345), ","); auto tee = newTee(heap<MockAsyncInputStream>(bigText.asBytes(), bigText.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); // Set up two pumps and let them block. auto leftPipe = newOneWayPipe(); auto rightPipe = newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out); auto rightPumpPromise = right->pumpTo(*rightPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); // Induce a write exception in the right branch pump. It should propagate to the right pump // promise. rightPipe.in = nullptr; KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("read end of pipe was aborted", rightPumpPromise.wait(ws)); // The left pump promise does not see the right branch's write exception. KJ_EXPECT(!leftPumpPromise.poll(ws)); auto allTextPromise = leftPipe.in->readAllText(); KJ_EXPECT(leftPumpPromise.wait(ws) == bigText.size()); leftPipe.out = nullptr; KJ_EXPECT(allTextPromise.wait(ws) == bigText); } KJ_TEST("Userland tee pump cancellation implies write cancellation") { kj::EventLoop loop; WaitScope ws(loop); auto text = "foo bar baz"_kj; auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size())); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out); // Arrange to block the left pump on its write operation. expectRead(*right, "foo ").wait(ws); KJ_EXPECT(!leftPumpPromise.poll(ws)); // Then cancel the pump, while it's still blocked. leftPumpPromise = nullptr; // It should cancel its write operations, so it should now be safe to destroy the output stream to // which it was pumping. try { leftPipe.out = nullptr; } catch (const Exception& exception) { KJ_FAIL_EXPECT("write promises were not canceled", exception); } } KJ_TEST("Userland tee buffer size limit") { kj::EventLoop loop; WaitScope ws(loop); auto text = "foo bar baz"_kj; { // We can carefully read data to stay under our ridiculously low limit. auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); expectRead(*left, "fo").wait(ws); expectRead(*right, "foo ").wait(ws); expectRead(*left, "o ba").wait(ws); expectRead(*right, "bar ").wait(ws); expectRead(*left, "r ba").wait(ws); expectRead(*right, "baz").wait(ws); expectRead(*left, "z").wait(ws); } { // Exceeding the limit causes both branches to see the exception after exhausting their buffers. auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); expectRead(*left, "fo").wait(ws); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded", expectRead(*left, "o").wait(ws)); expectRead(*right, "fo").wait(ws); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("tee buffer size limit exceeded", expectRead(*right, "o").wait(ws)); } { // We guarantee that two pumps started simultaneously will never exceed our buffer size limit. auto tee = newTee(heap<MockAsyncInputStream>(text.asBytes(), text.size()), 2); auto left = kj::mv(tee.branches[0]); auto right = kj::mv(tee.branches[1]); auto leftPipe = kj::newOneWayPipe(); auto rightPipe = kj::newOneWayPipe(); auto leftPumpPromise = left->pumpTo(*leftPipe.out); auto rightPumpPromise = right->pumpTo(*rightPipe.out); KJ_EXPECT(!leftPumpPromise.poll(ws)); KJ_EXPECT(!rightPumpPromise.poll(ws)); uint8_t leftBuf[11] = { 0 }; uint8_t rightBuf[11] = { 0 }; // The first read on the left pipe will succeed. auto leftPromise = leftPipe.in->tryRead(leftBuf, 1, 11); KJ_EXPECT(leftPromise.wait(ws) == 2); KJ_EXPECT(memcmp(leftBuf, text.begin(), 2) == 0); // But the second will block until we relieve pressure on the right pipe. leftPromise = leftPipe.in->tryRead(leftBuf + 2, 1, 9); KJ_EXPECT(!leftPromise.poll(ws)); // Relieve the right pipe pressure ... auto rightPromise = rightPipe.in->tryRead(rightBuf, 1, 11); KJ_EXPECT(rightPromise.wait(ws) == 2); KJ_EXPECT(memcmp(rightBuf, text.begin(), 2) == 0); // Now the second left pipe read will complete. KJ_EXPECT(leftPromise.wait(ws) == 2); KJ_EXPECT(memcmp(leftBuf, text.begin(), 4) == 0); // Leapfrog the left branch with the right. There should be 2 bytes in the buffer, so we can // demand a total of 4. rightPromise = rightPipe.in->tryRead(rightBuf + 2, 4, 9); KJ_EXPECT(rightPromise.wait(ws) == 4); KJ_EXPECT(memcmp(rightBuf, text.begin(), 6) == 0); // Leapfrog the right with the left. We demand the entire rest of the stream, so this should // block. Note that a regular read for this amount on one of the tee branches directly would // exceed our buffer size limit, but this one does not, because we have the pipe to regulate // backpressure for us. leftPromise = leftPipe.in->tryRead(leftBuf + 4, 7, 7); KJ_EXPECT(!leftPromise.poll(ws)); // Ask for the entire rest of the stream on the right branch and wrap things up. rightPromise = rightPipe.in->tryRead(rightBuf + 6, 5, 5); KJ_EXPECT(leftPromise.wait(ws) == 7); KJ_EXPECT(memcmp(leftBuf, text.begin(), 11) == 0); KJ_EXPECT(rightPromise.wait(ws) == 5); KJ_EXPECT(memcmp(rightBuf, text.begin(), 11) == 0); } } KJ_TEST("Userspace OneWayPipe whenWriteDisconnected()") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto abortedPromise = pipe.out->whenWriteDisconnected(); KJ_ASSERT(!abortedPromise.poll(ws)); pipe.in = nullptr; KJ_ASSERT(abortedPromise.poll(ws)); abortedPromise.wait(ws); } KJ_TEST("Userspace TwoWayPipe whenWriteDisconnected()") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newTwoWayPipe(); auto abortedPromise = pipe.ends[0]->whenWriteDisconnected(); KJ_ASSERT(!abortedPromise.poll(ws)); pipe.ends[1] = nullptr; KJ_ASSERT(abortedPromise.poll(ws)); abortedPromise.wait(ws); } #if !_WIN32 // We don't currently support detecting disconnect with IOCP. #if !__CYGWIN__ // TODO(soon): Figure out why whenWriteDisconnected() doesn't work on Cygwin. KJ_TEST("OS OneWayPipe whenWriteDisconnected()") { auto io = setupAsyncIo(); auto pipe = io.provider->newOneWayPipe(); pipe.out->write("foo", 3).wait(io.waitScope); auto abortedPromise = pipe.out->whenWriteDisconnected(); KJ_ASSERT(!abortedPromise.poll(io.waitScope)); pipe.in = nullptr; KJ_ASSERT(abortedPromise.poll(io.waitScope)); abortedPromise.wait(io.waitScope); } KJ_TEST("OS TwoWayPipe whenWriteDisconnected()") { auto io = setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); pipe.ends[0]->write("foo", 3).wait(io.waitScope); pipe.ends[1]->write("bar", 3).wait(io.waitScope); auto abortedPromise = pipe.ends[0]->whenWriteDisconnected(); KJ_ASSERT(!abortedPromise.poll(io.waitScope)); pipe.ends[1] = nullptr; KJ_ASSERT(abortedPromise.poll(io.waitScope)); abortedPromise.wait(io.waitScope); char buffer[4]; KJ_ASSERT(pipe.ends[0]->tryRead(&buffer, sizeof(buffer), sizeof(buffer)).wait(io.waitScope) == 3); buffer[3] = '\0'; KJ_EXPECT(buffer == "bar"_kj); } KJ_TEST("import socket FD that's already broken") { auto io = setupAsyncIo(); int fds[2]; KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); KJ_SYSCALL(write(fds[1], "foo", 3)); KJ_SYSCALL(close(fds[1])); auto stream = io.lowLevelProvider->wrapSocketFd(fds[0], LowLevelAsyncIoProvider::TAKE_OWNERSHIP); auto abortedPromise = stream->whenWriteDisconnected(); KJ_ASSERT(abortedPromise.poll(io.waitScope)); abortedPromise.wait(io.waitScope); char buffer[4]; KJ_ASSERT(stream->tryRead(&buffer, sizeof(buffer), sizeof(buffer)).wait(io.waitScope) == 3); buffer[3] = '\0'; KJ_EXPECT(buffer == "foo"_kj); } #endif // !__CYGWIN__ #endif // !_WIN32 } // namespace } // namespace kj