// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #if _WIN32 // Request Vista-level APIs. #define WINVER 0x0600 #define _WIN32_WINNT 0x0600 #endif #include "async-io.h" #include "async-io-internal.h" #include "debug.h" #include <kj/compat/gtest.h> #include <sys/types.h> #if _WIN32 #include <ws2tcpip.h> #include "windows-sanity.h" #define inet_pton InetPtonA #define inet_ntop InetNtopA #else #include <netdb.h> #include <unistd.h> #include <fcntl.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #endif namespace kj { namespace { TEST(AsyncIo, SimpleNetwork) { auto ioContext = setupAsyncIo(); auto& network = ioContext.provider->getNetwork(); Own<ConnectionReceiver> listener; Own<AsyncIoStream> server; Own<AsyncIoStream> client; char receiveBuffer[4]; auto port = newPromiseAndFulfiller<uint>(); port.promise.then([&](uint portnum) { return network.parseAddress("localhost", portnum); }).then([&](Own<NetworkAddress>&& result) { return result->connect(); }).then([&](Own<AsyncIoStream>&& result) { client = kj::mv(result); return client->write("foo", 3); }).detach([](kj::Exception&& exception) { KJ_FAIL_EXPECT(exception); }); kj::String result = network.parseAddress("*").then([&](Own<NetworkAddress>&& result) { listener = result->listen(); port.fulfiller->fulfill(listener->getPort()); return listener->accept(); }).then([&](Own<AsyncIoStream>&& result) { server = kj::mv(result); return server->tryRead(receiveBuffer, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer, n); }).wait(ioContext.waitScope); EXPECT_EQ("foo", result); } String tryParse(WaitScope& waitScope, Network& network, StringPtr text, uint portHint = 0) { return network.parseAddress(text, portHint).wait(waitScope)->toString(); } bool systemSupportsAddress(StringPtr addr, StringPtr service = nullptr) { // Can getaddrinfo() parse this addresses? This is only true if the address family (e.g., ipv6) // is configured on at least one interface. (The loopback interface usually has both ipv4 and // ipv6 configured, but not always.) struct addrinfo* list; int status = getaddrinfo( addr.cStr(), service == nullptr ? nullptr : service.cStr(), nullptr, &list); if (status == 0) { freeaddrinfo(list); return true; } else { return false; } } TEST(AsyncIo, AddressParsing) { auto ioContext = setupAsyncIo(); auto& w = ioContext.waitScope; auto& network = ioContext.provider->getNetwork(); EXPECT_EQ("*:0", tryParse(w, network, "*")); EXPECT_EQ("*:123", tryParse(w, network, "*:123")); EXPECT_EQ("0.0.0.0:0", tryParse(w, network, "0.0.0.0")); EXPECT_EQ("1.2.3.4:5678", tryParse(w, network, "1.2.3.4", 5678)); #if !_WIN32 EXPECT_EQ("unix:foo/bar/baz", tryParse(w, network, "unix:foo/bar/baz")); EXPECT_EQ("unix-abstract:foo/bar/baz", tryParse(w, network, "unix-abstract:foo/bar/baz")); #endif // We can parse services by name... // // For some reason, Android and some various Linux distros do not support service names. if (systemSupportsAddress("1.2.3.4", "http")) { EXPECT_EQ("1.2.3.4:80", tryParse(w, network, "1.2.3.4:http", 5678)); EXPECT_EQ("*:80", tryParse(w, network, "*:http", 5678)); } else { KJ_LOG(WARNING, "system does not support resolving service names on ipv4; skipping tests"); } // IPv6 tests. Annoyingly, these don't work on machines that don't have IPv6 configured on any // interfaces. if (systemSupportsAddress("::")) { EXPECT_EQ("[::]:123", tryParse(w, network, "0::0", 123)); EXPECT_EQ("[12ab:cd::34]:321", tryParse(w, network, "[12ab:cd:0::0:34]:321", 432)); if (systemSupportsAddress("12ab:cd::34", "http")) { EXPECT_EQ("[::]:80", tryParse(w, network, "[::]:http", 5678)); EXPECT_EQ("[12ab:cd::34]:80", tryParse(w, network, "[12ab:cd::34]:http", 5678)); } else { KJ_LOG(WARNING, "system does not support resolving service names on ipv6; skipping tests"); } } else { KJ_LOG(WARNING, "system does not support ipv6; skipping tests"); } // It would be nice to test DNS lookup here but the test would not be very hermetic. Even // localhost can map to different addresses depending on whether IPv6 is enabled. We do // connect to "localhost" in a different test, though. } TEST(AsyncIo, OneWayPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newOneWayPipe(); char receiveBuffer[4]; pipe.out->write("foo", 3).detach([](kj::Exception&& exception) { KJ_FAIL_EXPECT(exception); }); kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer, n); }).wait(ioContext.waitScope); EXPECT_EQ("foo", result); } TEST(AsyncIo, TwoWayPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newTwoWayPipe(); char receiveBuffer1[4]; char receiveBuffer2[4]; auto promise = pipe.ends[0]->write("foo", 3).then([&]() { return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer1, n); }); kj::String result = pipe.ends[1]->write("bar", 3).then([&]() { return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer2, n); }).wait(ioContext.waitScope); kj::String result2 = promise.wait(ioContext.waitScope); EXPECT_EQ("foo", result); EXPECT_EQ("bar", result2); } #if !_WIN32 TEST(AsyncIo, CapabilityPipe) { auto ioContext = setupAsyncIo(); auto pipe = ioContext.provider->newCapabilityPipe(); auto pipe2 = ioContext.provider->newCapabilityPipe(); char receiveBuffer1[4]; char receiveBuffer2[4]; // Expect to receive a stream, then write "bar" to it, then receive "foo" from it. Own<AsyncCapabilityStream> receivedStream; auto promise = pipe2.ends[1]->receiveStream() .then([&](Own<AsyncCapabilityStream> stream) { receivedStream = kj::mv(stream); return receivedStream->write("bar", 3); }).then([&]() { return receivedStream->tryRead(receiveBuffer2, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer2, n); }); // Send a stream, then write "foo" to the other end of the sent stream, then receive "bar" // from it. kj::String result = pipe2.ends[0]->sendStream(kj::mv(pipe.ends[1])) .then([&]() { return pipe.ends[0]->write("foo", 3); }).then([&]() { return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4); }).then([&](size_t n) { EXPECT_EQ(3u, n); return heapString(receiveBuffer1, n); }).wait(ioContext.waitScope); kj::String result2 = promise.wait(ioContext.waitScope); EXPECT_EQ("bar", result); EXPECT_EQ("foo", result2); } #endif TEST(AsyncIo, PipeThread) { auto ioContext = setupAsyncIo(); auto pipeThread = ioContext.provider->newPipeThread( [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { char buf[4]; stream.write("foo", 3).wait(waitScope); EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); EXPECT_EQ("bar", heapString(buf, 3)); // Expect disconnect. EXPECT_EQ(0, stream.tryRead(buf, 1, 1).wait(waitScope)); }); char buf[4]; pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); EXPECT_EQ("foo", heapString(buf, 3)); } TEST(AsyncIo, PipeThreadDisconnects) { // Like above, but in this case we expect the main thread to detect the pipe thread disconnecting. auto ioContext = setupAsyncIo(); auto pipeThread = ioContext.provider->newPipeThread( [](AsyncIoProvider& ioProvider, AsyncIoStream& stream, WaitScope& waitScope) { char buf[4]; stream.write("foo", 3).wait(waitScope); EXPECT_EQ(3u, stream.tryRead(buf, 3, 4).wait(waitScope)); EXPECT_EQ("bar", heapString(buf, 3)); }); char buf[4]; EXPECT_EQ(3u, pipeThread.pipe->tryRead(buf, 3, 4).wait(ioContext.waitScope)); EXPECT_EQ("foo", heapString(buf, 3)); pipeThread.pipe->write("bar", 3).wait(ioContext.waitScope); // Expect disconnect. EXPECT_EQ(0, pipeThread.pipe->tryRead(buf, 1, 1).wait(ioContext.waitScope)); } TEST(AsyncIo, Timeouts) { auto ioContext = setupAsyncIo(); Timer& timer = ioContext.provider->getTimer(); auto promise1 = timer.timeoutAfter(10 * MILLISECONDS, kj::Promise<void>(kj::NEVER_DONE)); auto promise2 = timer.timeoutAfter(100 * MILLISECONDS, kj::Promise<int>(123)); EXPECT_TRUE(promise1.then([]() { return false; }, [](kj::Exception&& e) { return true; }) .wait(ioContext.waitScope)); EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); } #if !_WIN32 // datagrams not implemented on win32 yet bool isMsgTruncBroken() { // Detect if the kernel fails to set MSG_TRUNC on recvmsg(). This seems to be the case at least // when running an arm64 binary under qemu. int fd; KJ_SYSCALL(fd = socket(AF_INET, SOCK_DGRAM, 0)); KJ_DEFER(close(fd)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(0x7f000001); KJ_SYSCALL(bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); // Read back the assigned port. socklen_t len = sizeof(addr); KJ_SYSCALL(getsockname(fd, reinterpret_cast<struct sockaddr*>(&addr), &len)); KJ_ASSERT(len == sizeof(addr)); const char* message = "foobar"; KJ_SYSCALL(sendto(fd, message, strlen(message), 0, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))); char buf[4]; struct iovec iov; iov.iov_base = buf; iov.iov_len = 3; struct msghdr msg; memset(&msg, 0, sizeof(msg)); msg.msg_iov = &iov; msg.msg_iovlen = 1; ssize_t n; KJ_SYSCALL(n = recvmsg(fd, &msg, 0)); KJ_ASSERT(n == 3); buf[3] = 0; KJ_ASSERT(kj::StringPtr(buf) == "foo"); return (msg.msg_flags & MSG_TRUNC) == 0; } TEST(AsyncIo, Udp) { bool msgTruncBroken = isMsgTruncBroken(); auto ioContext = setupAsyncIo(); auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope); auto port1 = addr->bindDatagramPort(); auto port2 = addr->bindDatagramPort(); auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort()) .wait(ioContext.waitScope); auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort()) .wait(ioContext.waitScope); Own<NetworkAddress> receivedAddr; { // Send a message and receive it. EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope)); auto receiver = port2->makeReceiver(); receiver->receive().wait(ioContext.waitScope); { auto content = receiver->getContent(); EXPECT_EQ("foo", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } receivedAddr = receiver->getSource().clone(); EXPECT_EQ(addr1->toString(), receivedAddr->toString()); { auto ancillary = receiver->getAncillary(); EXPECT_EQ(0, ancillary.value.size()); EXPECT_FALSE(ancillary.isTruncated); } // Receive a second message with the same receiver. { auto promise = receiver->receive(); // This time, start receiving before sending EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope)); promise.wait(ioContext.waitScope); auto content = receiver->getContent(); EXPECT_EQ("barbaz", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } } DatagramReceiver::Capacity capacity; capacity.content = 8; capacity.ancillary = 1024; { // Send a reply that will be truncated. EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope)); auto recv1 = port1->makeReceiver(capacity); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("01234567", kj::heapString(content.value.asChars())); EXPECT_TRUE(content.isTruncated || msgTruncBroken); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_EQ(0, ancillary.value.size()); EXPECT_FALSE(ancillary.isTruncated); } #if defined(IP_PKTINFO) && !__CYGWIN__ && !__aarch64__ // Set IP_PKTINFO header and try to receive it. // // Doesn't work on Cygwin; see: https://cygwin.com/ml/cygwin/2009-01/msg00350.html // TODO(someday): Might work on more-recent Cygwin; I'm still testing against 1.7. // // Doesn't work when running arm64 binaries under QEMU -- in fact, it crashes QEMU. We don't // have a good way to test if we're under QEMU so we just skip this test on aarch64. int one = 1; port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one)); EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("foo", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_FALSE(ancillary.isTruncated); ASSERT_EQ(1, ancillary.value.size()); auto message = ancillary.value[0]; EXPECT_EQ(IPPROTO_IP, message.getLevel()); EXPECT_EQ(IP_PKTINFO, message.getType()); EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size()); auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>()); EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1 } // See what happens if there's not quite enough space for in_pktinfo. capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8; recv1 = port1->makeReceiver(capacity); EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("bar", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_TRUE(ancillary.isTruncated || msgTruncBroken); // We might get a message, but it will be truncated. if (ancillary.value.size() != 0) { EXPECT_EQ(1, ancillary.value.size()); auto message = ancillary.value[0]; EXPECT_EQ(IPPROTO_IP, message.getLevel()); EXPECT_EQ(IP_PKTINFO, message.getType()); EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr); EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo)); } } // See what happens if there's not enough space even for the cmsghdr. capacity.ancillary = CMSG_SPACE(0) - 8; recv1 = port1->makeReceiver(capacity); EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope)); recv1->receive().wait(ioContext.waitScope); { auto content = recv1->getContent(); EXPECT_EQ("baz", kj::heapString(content.value.asChars())); EXPECT_FALSE(content.isTruncated); } EXPECT_EQ(addr2->toString(), recv1->getSource().toString()); { auto ancillary = recv1->getAncillary(); EXPECT_TRUE(ancillary.isTruncated); EXPECT_EQ(0, ancillary.value.size()); } #endif } } #endif // !_WIN32 #ifdef __linux__ // Abstract unix sockets are only supported on Linux TEST(AsyncIo, AbstractUnixSocket) { auto ioContext = setupAsyncIo(); auto& network = ioContext.provider->getNetwork(); Own<NetworkAddress> addr = network.parseAddress("unix-abstract:foo").wait(ioContext.waitScope); Own<ConnectionReceiver> listener = addr->listen(); // chdir proves no filesystem dependence. Test fails for regular unix socket // but passes for abstract unix socket. int originalDirFd; KJ_SYSCALL(originalDirFd = open(".", O_RDONLY | O_DIRECTORY | O_CLOEXEC)); KJ_DEFER(close(originalDirFd)); KJ_SYSCALL(chdir("/tmp")); KJ_DEFER(KJ_SYSCALL(fchdir(originalDirFd))); addr->connect().attach(kj::mv(listener)).wait(ioContext.waitScope); } #endif // __linux__ KJ_TEST("CIDR parsing") { KJ_EXPECT(_::CidrRange("1.2.3.4/16").toString() == "1.2.0.0/16"); KJ_EXPECT(_::CidrRange("1.2.255.4/18").toString() == "1.2.192.0/18"); KJ_EXPECT(_::CidrRange("1234::abcd:ffff:ffff/98").toString() == "1234::abcd:c000:0/98"); KJ_EXPECT(_::CidrRange::inet4({1,2,255,4}, 18).toString() == "1.2.192.0/18"); KJ_EXPECT(_::CidrRange::inet6({0x1234, 0x5678}, {0xabcd, 0xffff, 0xffff}, 98).toString() == "1234:5678::abcd:c000:0/98"); union { struct sockaddr addr; struct sockaddr_in addr4; struct sockaddr_in6 addr6; }; memset(&addr6, 0, sizeof(addr6)); { addr4.sin_family = AF_INET; addr4.sin_addr.s_addr = htonl(0x0102dfff); KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); KJ_EXPECT(!_::CidrRange("::/0").matches(&addr)); } { addr4.sin_family = AF_INET6; byte bytes[16] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; memcpy(addr6.sin6_addr.s6_addr, bytes, 16); KJ_EXPECT(_::CidrRange("0102:03ff::/24").matches(&addr)); KJ_EXPECT(!_::CidrRange("0102:02ff::/24").matches(&addr)); KJ_EXPECT(_::CidrRange("0102:02ff::/23").matches(&addr)); KJ_EXPECT(_::CidrRange("0102:0304:0506:0708:090a:0b0c:0d0e:0f10/128").matches(&addr)); KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); KJ_EXPECT(!_::CidrRange("0.0.0.0/0").matches(&addr)); } { addr4.sin_family = AF_INET6; inet_pton(AF_INET6, "::ffff:1.2.223.255", &addr6.sin6_addr); KJ_EXPECT(_::CidrRange("1.2.255.255/18").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.2.255.255/19").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.0.0/16").matches(&addr)); KJ_EXPECT(!_::CidrRange("1.3.0.0/16").matches(&addr)); KJ_EXPECT(_::CidrRange("1.2.223.255/32").matches(&addr)); KJ_EXPECT(_::CidrRange("0.0.0.0/0").matches(&addr)); KJ_EXPECT(_::CidrRange("::/0").matches(&addr)); } } bool allowed4(_::NetworkFilter& filter, StringPtr addrStr) { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; inet_pton(AF_INET, addrStr.cStr(), &addr.sin_addr); return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); } bool allowed6(_::NetworkFilter& filter, StringPtr addrStr) { struct sockaddr_in6 addr; memset(&addr, 0, sizeof(addr)); addr.sin6_family = AF_INET6; inet_pton(AF_INET6, addrStr.cStr(), &addr.sin6_addr); return filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)); } KJ_TEST("NetworkFilter") { _::NetworkFilter base; KJ_EXPECT(allowed4(base, "8.8.8.8")); KJ_EXPECT(!allowed4(base, "240.1.2.3")); { _::NetworkFilter filter({"public"}, {}, base); KJ_EXPECT(allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(!allowed4(filter, "192.168.0.1")); KJ_EXPECT(!allowed4(filter, "10.1.2.3")); KJ_EXPECT(!allowed4(filter, "127.0.0.1")); KJ_EXPECT(!allowed4(filter, "0.0.0.0")); KJ_EXPECT(allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); KJ_EXPECT(!allowed6(filter, "fc00::1234")); KJ_EXPECT(!allowed6(filter, "::1")); KJ_EXPECT(!allowed6(filter, "::")); } { _::NetworkFilter filter({"private"}, {"local"}, base); KJ_EXPECT(!allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(allowed4(filter, "192.168.0.1")); KJ_EXPECT(allowed4(filter, "10.1.2.3")); KJ_EXPECT(!allowed4(filter, "127.0.0.1")); KJ_EXPECT(!allowed4(filter, "0.0.0.0")); KJ_EXPECT(!allowed6(filter, "2400:cb00:2048:1::c629:d7a2")); KJ_EXPECT(allowed6(filter, "fc00::1234")); KJ_EXPECT(!allowed6(filter, "::1")); KJ_EXPECT(!allowed6(filter, "::")); } { _::NetworkFilter filter({"1.0.0.0/8", "1.2.3.0/24"}, {"1.2.0.0/16", "1.2.3.4/32"}, base); KJ_EXPECT(!allowed4(filter, "8.8.8.8")); KJ_EXPECT(!allowed4(filter, "240.1.2.3")); KJ_EXPECT(allowed4(filter, "1.0.0.1")); KJ_EXPECT(!allowed4(filter, "1.2.2.1")); KJ_EXPECT(allowed4(filter, "1.2.3.1")); KJ_EXPECT(!allowed4(filter, "1.2.3.4")); } } KJ_TEST("Network::restrictPeers()") { auto ioContext = setupAsyncIo(); auto& w = ioContext.waitScope; auto& network = ioContext.provider->getNetwork(); auto restrictedNetwork = network.restrictPeers({"public"}); KJ_EXPECT(tryParse(w, *restrictedNetwork, "8.8.8.8") == "8.8.8.8:0"); #if !_WIN32 KJ_EXPECT_THROW_MESSAGE("restrictPeers", tryParse(w, *restrictedNetwork, "unix:/foo")); #endif auto addr = restrictedNetwork->parseAddress("127.0.0.1").wait(w); auto listener = addr->listen(); auto acceptTask = listener->accept() .then([](kj::Own<kj::AsyncIoStream>) { KJ_FAIL_EXPECT("should not have received connection"); }).eagerlyEvaluate(nullptr); KJ_EXPECT_THROW_MESSAGE("restrictPeers", addr->connect().wait(w)); // We can connect to the listener but the connection will be immediately closed. auto addr2 = network.parseAddress("127.0.0.1", listener->getPort()).wait(w); auto conn = addr2->connect().wait(w); KJ_EXPECT(conn->readAllText().wait(w) == ""); } kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { if (expected.size() == 0) return kj::READY_NOW; auto buffer = kj::heapArray<char>(expected.size()); auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { if (amount == 0) { KJ_FAIL_ASSERT("expected data never sent", expected); } auto actual = buffer.slice(0, amount); if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); } return expectRead(in, expected.slice(amount)); })); } KJ_TEST("Userland pipe") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); char buf[4]; KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3); buf[3] = '\0'; KJ_EXPECT(buf == "foo"_kj); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe cancel write") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto promise = pipe.out->write("foobar", 6); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); KJ_EXPECT(!promise.poll(ws)); promise = nullptr; promise = pipe.out->write("baz", 3); expectRead(*pipe.in, "baz").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); } KJ_TEST("Userland pipe cancel read") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto writeOp = pipe.out->write("foo", 3); auto readOp = expectRead(*pipe.in, "foobar"); writeOp.wait(ws); KJ_EXPECT(!readOp.poll(ws)); readOp = nullptr; auto writeOp2 = pipe.out->write("baz", 3); expectRead(*pipe.in, "baz").wait(ws); } KJ_TEST("Userland pipe pumpTo") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); auto promise2 = pipe2.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe tryPumpFrom") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); auto promise2 = pipe2.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(!promise2.poll(ws)); KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe pumpTo cancel") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); auto promise = pipe.out->write("foobar", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); // Cancel pump. pumpPromise = nullptr; auto promise3 = pipe2.out->write("baz", 3); expectRead(*pipe2.in, "baz").wait(ws); } KJ_TEST("Userland pipe tryPumpFrom cancel") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foobar", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); // Cancel pump. pumpPromise = nullptr; auto promise3 = pipe2.out->write("baz", 3); expectRead(*pipe2.in, "baz").wait(ws); } KJ_TEST("Userland pipe with limit") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(6); { auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); promise.wait(ws); } { auto promise = pipe.in->readAllText(); KJ_EXPECT(!promise.poll(ws)); auto promise2 = pipe.out->write("barbaz", 6); KJ_EXPECT(promise.wait(ws) == "bar"); KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); } // Further writes throw and reads return EOF. KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); KJ_EXPECT(pipe.in->readAllText().wait(ws) == ""); } KJ_TEST("Userland pipe pumpTo with limit") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(6); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); { auto promise = pipe.out->write("foo", 3); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); promise.wait(ws); } { auto promise = expectRead(*pipe2.in, "bar"); KJ_EXPECT(!promise.poll(ws)); auto promise2 = pipe.out->write("barbaz", 6); promise.wait(ws); pumpPromise.wait(ws); KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws)); } // Further writes throw. KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws)); } KJ_TEST("Userland pipe gather write") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foobar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foo").wait(ws); expectRead(*pipe.in, "bar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "fo").wait(ws); expectRead(*pipe.in, "obar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe.in, "foob").wait(ws); expectRead(*pipe.in, "ar").wait(ws); promise.wait(ws); auto promise2 = pipe.in->readAllText(); KJ_EXPECT(!promise2.poll(ws)); pipe.out = nullptr; KJ_EXPECT(promise2.wait(ws) == ""); } KJ_TEST("Userland pipe gather write pump") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); expectRead(*pipe2.in, "bar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "fo").wait(ws); expectRead(*pipe2.in, "obar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write pump split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foob").wait(ws); expectRead(*pipe2.in, "ar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 6); } KJ_TEST("Userland pipe gather write split pump on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3) .then([&](uint64_t i) { KJ_EXPECT(i == 3); return pipe.in->pumpTo(*pipe2.out, 3); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe gather write split pump mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2) .then([&](uint64_t i) { KJ_EXPECT(i == 2); return pipe.in->pumpTo(*pipe2.out, 4); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 4); } KJ_TEST("Userland pipe gather write split pump mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4) .then([&](uint64_t i) { KJ_EXPECT(i == 4); return pipe.in->pumpTo(*pipe2.out, 2); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 2); } KJ_TEST("Userland pipe gather write pumpFrom") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foo").wait(ws); expectRead(*pipe2.in, "bar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "fo").wait(ws); expectRead(*pipe2.in, "obar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write pumpFrom split mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foob").wait(ws); expectRead(*pipe2.in, "ar").wait(ws); promise.wait(ws); pipe.out = nullptr; char c; auto eofPromise = pipe2.in->tryRead(&c, 1, 1); eofPromise.poll(ws); // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; KJ_EXPECT(eofPromise.wait(ws) == 0); } KJ_TEST("Userland pipe gather write split pumpFrom on buffer boundary") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)) .then([&](uint64_t i) { KJ_EXPECT(i == 3); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 3); } KJ_TEST("Userland pipe gather write split pumpFrom mid-first-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)) .then([&](uint64_t i) { KJ_EXPECT(i == 2); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 4); } KJ_TEST("Userland pipe gather write split pumpFrom mid-second-buffer") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4)) .then([&](uint64_t i) { KJ_EXPECT(i == 4); return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2)); }); ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() }; auto promise = pipe.out->write(parts); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); pipe.out = nullptr; KJ_EXPECT(pumpPromise.wait(ws) == 2); } KJ_TEST("Userland pipe pumpTo less than write amount") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); auto pieces = kj::heapArray<ArrayPtr<const byte>>(2); byte a[1] = { 'a' }; byte b[1] = { 'b' }; pieces[0] = arrayPtr(a, 1); pieces[1] = arrayPtr(b, 1); auto writePromise = pipe.out->write(pieces); KJ_EXPECT(!writePromise.poll(ws)); expectRead(*pipe2.in, "a").wait(ws); KJ_EXPECT(pumpPromise.wait(ws) == 1); KJ_EXPECT(!writePromise.poll(ws)); pumpPromise = pipe.in->pumpTo(*pipe2.out, 1); expectRead(*pipe2.in, "b").wait(ws); KJ_EXPECT(pumpPromise.wait(ws) == 1); writePromise.wait(ws); } KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") { kj::EventLoop loop; WaitScope ws(loop); auto pipe = newOneWayPipe(); auto pipe2 = newOneWayPipe(); auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in)); auto promise = pipe.out->write("foobar", 6); KJ_EXPECT(!promise.poll(ws)); expectRead(*pipe2.in, "foobar").wait(ws); promise.wait(ws); KJ_EXPECT(!pumpPromise.poll(ws)); pipe.out = nullptr; pipe2.in = nullptr; // force pump to notice EOF KJ_EXPECT(pumpPromise.wait(ws) == 6); pipe2.out = nullptr; } } // namespace } // namespace kj