async-io-unix.c++ 59.7 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23 24
#if !_WIN32
// For Win32 implementation, see async-io-win32.c++.

25
#ifndef _GNU_SOURCE
Ivan Shynkarenka's avatar
Ivan Shynkarenka committed
26 27 28
#define _GNU_SOURCE
#endif

29
#include "async-io.h"
30
#include "async-io-internal.h"
31 32
#include "async-unix.h"
#include "debug.h"
33
#include "thread.h"
Kenton Varda's avatar
Kenton Varda committed
34
#include "io.h"
Tom Lee's avatar
Tom Lee committed
35
#include "miniposix.h"
36 37 38 39 40 41 42
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
43
#include <netinet/in.h>
44
#include <netinet/tcp.h>
45 46 47
#include <stddef.h>
#include <stdlib.h>
#include <arpa/inet.h>
Kenton Varda's avatar
Kenton Varda committed
48 49
#include <netdb.h>
#include <set>
50
#include <poll.h>
51
#include <limits.h>
52
#include <sys/ioctl.h>
53

54 55 56 57
namespace kj {

namespace {

58
void setNonblocking(int fd) {
59 60 61 62
#ifdef FIONBIO
  int opt = 1;
  KJ_SYSCALL(ioctl(fd, FIONBIO, &opt));
#else
63 64 65 66 67
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
  if ((flags & O_NONBLOCK) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
  }
68
#endif
69 70
}

71
void setCloseOnExec(int fd) {
72 73 74
#ifdef FIOCLEX
  KJ_SYSCALL(ioctl(fd, FIOCLEX));
#else
75 76 77 78 79
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFD));
  if ((flags & FD_CLOEXEC) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFD, flags | FD_CLOEXEC));
  }
80
#endif
81 82
}

Kenton Varda's avatar
Kenton Varda committed
83
static constexpr uint NEW_FD_FLAGS =
Kenton Varda's avatar
Kenton Varda committed
84
#if __linux__ && !__BIONIC__
85
    LowLevelAsyncIoProvider::ALREADY_CLOEXEC | LowLevelAsyncIoProvider::ALREADY_NONBLOCK |
Kenton Varda's avatar
Kenton Varda committed
86 87 88 89 90
#endif
    LowLevelAsyncIoProvider::TAKE_OWNERSHIP;
// We always try to open FDs with CLOEXEC and NONBLOCK already set on Linux, but on other platforms
// this is not possible.

91 92
class OwnedFileDescriptor {
public:
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  OwnedFileDescriptor(int fd, uint flags): fd(fd), flags(flags) {
    if (flags & LowLevelAsyncIoProvider::ALREADY_NONBLOCK) {
      KJ_DREQUIRE(fcntl(fd, F_GETFL) & O_NONBLOCK, "You claimed you set NONBLOCK, but you didn't.");
    } else {
      setNonblocking(fd);
    }

    if (flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) {
      if (flags & LowLevelAsyncIoProvider::ALREADY_CLOEXEC) {
        KJ_DREQUIRE(fcntl(fd, F_GETFD) & FD_CLOEXEC,
                    "You claimed you set CLOEXEC, but you didn't.");
      } else {
        setCloseOnExec(fd);
      }
    }
108 109 110 111
  }

  ~OwnedFileDescriptor() noexcept(false) {
    // Don't use SYSCALL() here because close() should not be repeated on EINTR.
112
    if ((flags & LowLevelAsyncIoProvider::TAKE_OWNERSHIP) && close(fd) < 0) {
113 114 115 116 117 118 119 120 121
      KJ_FAIL_SYSCALL("close", errno, fd) {
        // Recoverable exceptions are safe in destructors.
        break;
      }
    }
  }

protected:
  const int fd;
122 123 124

private:
  uint flags;
125 126 127 128
};

// =======================================================================================

129
class AsyncStreamFd: public OwnedFileDescriptor, public AsyncCapabilityStream {
130
public:
131
  AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
132
      : OwnedFileDescriptor(fd, flags),
133
        eventPort(eventPort),
134
        observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
135
  virtual ~AsyncStreamFd() noexcept(false) {}
136 137

  Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
    return tryReadInternal(buffer, minBytes, maxBytes, nullptr, 0, {0,0})
        .then([](ReadResult r) { return r.byteCount; });
  }

  Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
                                     AutoCloseFd* fdBuffer, size_t maxFds) override {
    return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, {0,0});
  }

  Promise<ReadResult> tryReadWithStreams(
      void* buffer, size_t minBytes, size_t maxBytes,
      Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) override {
    auto fdBuffer = kj::heapArray<AutoCloseFd>(maxStreams);
    auto promise = tryReadInternal(buffer, minBytes, maxBytes, fdBuffer.begin(), maxStreams, {0,0});

    return promise.then([this, fdBuffer = kj::mv(fdBuffer), streamBuffer]
                        (ReadResult result) mutable {
      for (auto i: kj::zeroTo(result.capCount)) {
        streamBuffer[i] = kj::heap<AsyncStreamFd>(eventPort, fdBuffer[i].release(),
            LowLevelAsyncIoProvider::TAKE_OWNERSHIP | LowLevelAsyncIoProvider::ALREADY_CLOEXEC);
      }
      return result;
    });
161 162 163
  }

  Promise<void> write(const void* buffer, size_t size) override {
164 165
    ssize_t n;
    KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
166 167 168 169 170 171 172 173 174 175 176
      // Error.

      // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
      // a bug that exists in both Clang and GCC:
      //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
      //   http://llvm.org/bugs/show_bug.cgi?id=12286
      goto error;
    }
    if (false) {
    error:
      return kj::READY_NOW;
177 178
    }

179 180 181 182 183 184 185
    if (n < 0) {
      // EAGAIN -- need to wait for writability and try again.
      return observer.whenBecomesWritable().then([=]() {
        return write(buffer, size);
      });
    } else if (n == size) {
      // All done.
186
      return READY_NOW;
187 188 189 190 191 192 193
    } else {
      // Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
      // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
      // for non-blocking operations. So, we'll need to write() again now, even though it will
      // almost certainly fail with EAGAIN. See comments in the read path for more info.
      buffer = reinterpret_cast<const byte*>(buffer) + n;
      size -= n;
194
      return write(buffer, size);
195
    }
196 197 198 199
  }

  Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
    if (pieces.size() == 0) {
200
      return writeInternal(nullptr, nullptr, nullptr);
201
    } else {
202
      return writeInternal(pieces[0], pieces.slice(1, pieces.size()), nullptr);
203 204 205
    }
  }

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
  Promise<void> writeWithFds(ArrayPtr<const byte> data,
                             ArrayPtr<const ArrayPtr<const byte>> moreData,
                             ArrayPtr<const int> fds) override {
    return writeInternal(data, moreData, fds);
  }

  Promise<void> writeWithStreams(ArrayPtr<const byte> data,
                                 ArrayPtr<const ArrayPtr<const byte>> moreData,
                                 Array<Own<AsyncCapabilityStream>> streams) override {
    auto fds = KJ_MAP(stream, streams) {
      return downcast<AsyncStreamFd>(*stream).fd;
    };
    auto promise = writeInternal(data, moreData, fds);
    return promise.attach(kj::mv(fds));
  }

222 223 224 225 226 227 228 229 230 231 232
  Promise<void> whenWriteDisconnected() override {
    KJ_IF_MAYBE(p, writeDisconnectedPromise) {
      return p->addBranch();
    } else {
      auto fork = observer.whenWriteDisconnected().fork();
      auto result = fork.addBranch();
      writeDisconnectedPromise = kj::mv(fork);
      return kj::mv(result);
    }
  }

233 234 235
  void shutdownWrite() override {
    // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    // UnixAsyncIoProvider interface.
236
    KJ_SYSCALL(shutdown(fd, SHUT_WR));
237 238
  }

239 240 241 242 243 244
  void abortRead() override {
    // There's no legitimate way to get an AsyncStreamFd that isn't a socket through the
    // UnixAsyncIoProvider interface.
    KJ_SYSCALL(shutdown(fd, SHUT_RD));
  }

245 246 247 248 249 250 251 252 253 254
  void getsockopt(int level, int option, void* value, uint* length) override {
    socklen_t socklen = *length;
    KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
    *length = socklen;
  }

  void setsockopt(int level, int option, const void* value, uint length) override {
    KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
  }

255 256 257 258 259 260 261 262 263 264 265 266
  void getsockname(struct sockaddr* addr, uint* length) override {
    socklen_t socklen = *length;
    KJ_SYSCALL(::getsockname(fd, addr, &socklen));
    *length = socklen;
  }

  void getpeername(struct sockaddr* addr, uint* length) override {
    socklen_t socklen = *length;
    KJ_SYSCALL(::getpeername(fd, addr, &socklen));
    *length = socklen;
  }

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
  Promise<void> waitConnected() {
    // Wait until initial connection has completed. This actually just waits until it is writable.

    // Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
    // need to explicitly check if the socket is already connected.

    struct pollfd pollfd;
    memset(&pollfd, 0, sizeof(pollfd));
    pollfd.fd = fd;
    pollfd.events = POLLOUT;

    int pollResult;
    KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));

    if (pollResult == 0) {
      // Not ready yet. We can safely use the edge-triggered observer.
283
      return observer.whenBecomesWritable();
284 285 286 287 288 289
    } else {
      // Ready now.
      return kj::READY_NOW;
    }
  }

290
private:
291
  UnixEventPort& eventPort;
292
  UnixEventPort::FdObserver observer;
293
  Maybe<ForkedPromise<void>> writeDisconnectedPromise;
294

295 296 297
  Promise<ReadResult> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
                                      AutoCloseFd* fdBuffer, size_t maxFds,
                                      ReadResult alreadyRead) {
298 299 300 301 302
    // `alreadyRead` is the number of bytes we have already received via previous reads -- minBytes,
    // maxBytes, and buffer have already been adjusted to account for them, but this count must
    // be included in the final return value.

    ssize_t n;
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
    if (maxFds == 0) {
      KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
        // Error.

        // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
        // a bug that exists in both Clang and GCC:
        //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
        //   http://llvm.org/bugs/show_bug.cgi?id=12286
        goto error;
      }
    } else {
      struct msghdr msg;
      memset(&msg, 0, sizeof(msg));

      struct iovec iov;
      memset(&iov, 0, sizeof(iov));
      iov.iov_base = buffer;
      iov.iov_len = maxBytes;
      msg.msg_iov = &iov;
      msg.msg_iovlen = 1;

      // Allocate space to receive a cmsg.
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
#if __APPLE__ || __FreeBSD__
      // Until very recently (late 2018 / early 2019), FreeBSD suffered from a bug in which when
      // an SCM_RIGHTS message was truncated on delivery, it would not close the FDs that weren't
      // delivered -- they would simply leak: https://bugs.freebsd.org/131876
      //
      // My testing indicates that MacOS has this same bug as of today (April 2019). I don't know
      // if they plan to fix it or are even aware of it.
      //
      // To handle both cases, we will always provide space to receive 512 FDs. Hopefully, this is
      // greater than the maximum number of FDs that these kernels will transmit in one message
      // PLUS enough space for any other ancillary messages that could be sent before the
      // SCM_RIGHTS message to push it back in the buffer. I couldn't find any firm documentation
      // on these limits, though -- I only know that Linux is limited to 253, and I saw a hint in
      // a comment in someone else's application that suggested FreeBSD is the same. Hopefully,
      // then, this is sufficient to prevent attacks. But if not, there's nothing more we can do;
      // it's really up to the kernel to fix this.
      size_t msgBytes = CMSG_SPACE(sizeof(int) * 512);
#else
      size_t msgBytes = CMSG_SPACE(sizeof(int) * maxFds);
#endif
Kenton Varda's avatar
Kenton Varda committed
345 346 347 348 349 350 351 352 353
      // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
      // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
      // surprisingly end up with space for two file descriptors when you only wanted one. However,
      // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
      // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
      // other platforms), so we want to allocate an array of words (we use void*). So... we use
      // CMSG_SPACE() and then additionally round up to deal with Mac.
      size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
      KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
354 355 356
      auto cmsgBytes = cmsgSpace.asBytes();
      memset(cmsgBytes.begin(), 0, cmsgBytes.size());
      msg.msg_control = cmsgBytes.begin();
357
      msg.msg_controllen = msgBytes;
358 359 360 361 362 363

#ifdef MSG_CMSG_CLOEXEC
      static constexpr int RECVMSG_FLAGS = MSG_CMSG_CLOEXEC;
#else
      static constexpr int RECVMSG_FLAGS = 0;
#endif
364

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
      KJ_NONBLOCKING_SYSCALL(n = ::recvmsg(fd, &msg, RECVMSG_FLAGS)) {
        // Error.

        // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
        // a bug that exists in both Clang and GCC:
        //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
        //   http://llvm.org/bugs/show_bug.cgi?id=12286
        goto error;
      }

      if (n >= 0) {
        // Process all messages.
        //
        // WARNING DANGER: We have to be VERY careful not to miss a file descriptor here, because
        // if we do, then that FD will never be closed, and a malicious peer could exploit this to
        // fill up our FD table, creating a DoS attack. Some things to keep in mind:
        // - CMSG_SPACE() could have rounded up the space for alignment purposes, and this could
        //   mean we permitted the kernel to deliver more file descriptors than `maxFds`. We need
        //   to close the extras.
        // - We can receive multiple ancillary messages at once. In particular, there is also
        //   SCM_CREDENTIALS. The sender decides what to send. They could send SCM_CREDENTIALS
        //   first followed by SCM_RIGHTS. We need to make sure we see both.
        size_t nfds = 0;
388
        size_t spaceLeft = msg.msg_controllen;
389 390
        for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
            cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
391 392 393 394 395
          if (spaceLeft >= CMSG_LEN(0) &&
              cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
            // Some operating systems (like MacOS) do not adjust csmg_len when the message is
            // truncated. We must do so ourselves or risk overrunning the buffer.
            auto len = kj::min(cmsg->cmsg_len, spaceLeft);
396
            auto data = arrayPtr(reinterpret_cast<int*>(CMSG_DATA(cmsg)),
397
                                 (len - CMSG_LEN(0)) / sizeof(int));
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
            kj::Vector<kj::AutoCloseFd> trashFds;
            for (auto fd: data) {
              kj::AutoCloseFd ownFd(fd);
              if (nfds < maxFds) {
                fdBuffer[nfds++] = kj::mv(ownFd);
              } else {
                trashFds.add(kj::mv(ownFd));
              }
            }
          }
        }

#ifndef MSG_CMSG_CLOEXEC
        for (size_t i = 0; i < nfds; i++) {
          setCloseOnExec(fdBuffer[i]);
        }
#endif

        alreadyRead.capCount += nfds;
        fdBuffer += nfds;
        maxFds -= nfds;
      }
420
    }
421

422 423
    if (false) {
    error:
424 425 426 427 428
      return alreadyRead;
    }

    if (n < 0) {
      // Read would block.
429
      return observer.whenBecomesReadable().then([=]() {
430
        return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
431 432 433 434
      });
    } else if (n == 0) {
      // EOF -OR- maxBytes == 0.
      return alreadyRead;
435 436
    } else if (implicitCast<size_t>(n) >= minBytes) {
      // We read enough to stop here.
437 438
      alreadyRead.byteCount += n;
      return alreadyRead;
439
    } else {
440
      // The kernel returned fewer bytes than we asked for (and fewer than we need).
441 442 443 444

      buffer = reinterpret_cast<byte*>(buffer) + n;
      minBytes -= n;
      maxBytes -= n;
445
      alreadyRead.byteCount += n;
446

447 448 449 450 451 452 453 454 455 456 457
      // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
      // we've consumed the whole read buffer here. If a signal is delivered in the middle of a
      // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
      // result, with data still in the buffer.
      //     https://bugzilla.kernel.org/show_bug.cgi?id=199131
      //     https://twitter.com/CaptainSegfault/status/1112622245531144194
      //
      // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
      // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
      // non-null) to avoid a redundant call to read(). Alas...
      return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
458 459 460 461
    }
  }

  Promise<void> writeInternal(ArrayPtr<const byte> firstPiece,
462 463
                              ArrayPtr<const ArrayPtr<const byte>> morePieces,
                              ArrayPtr<const int> fds) {
Tom Lee's avatar
Tom Lee committed
464
    const size_t iovmax = kj::miniposix::iovMax(1 + morePieces.size());
465 466
    // If there are more than IOV_MAX pieces, we'll only write the first IOV_MAX for now, and
    // then we'll loop later.
467
    KJ_STACK_ARRAY(struct iovec, iov, kj::min(1 + morePieces.size(), iovmax), 16, 128);
468
    size_t iovTotal = 0;
469 470 471 472

    // writev() interface is not const-correct.  :(
    iov[0].iov_base = const_cast<byte*>(firstPiece.begin());
    iov[0].iov_len = firstPiece.size();
473 474 475 476 477
    iovTotal += iov[0].iov_len;
    for (uint i = 1; i < iov.size(); i++) {
      iov[i].iov_base = const_cast<byte*>(morePieces[i - 1].begin());
      iov[i].iov_len = morePieces[i - 1].size();
      iovTotal += iov[i].iov_len;
478 479
    }

480 481 482 483
    if (iovTotal == 0) {
      KJ_REQUIRE(fds.size() == 0, "can't write FDs without bytes");
      return kj::READY_NOW;
    }
Kenton Varda's avatar
Kenton Varda committed
484

485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
    ssize_t n;
    if (fds.size() == 0) {
      KJ_NONBLOCKING_SYSCALL(n = ::writev(fd, iov.begin(), iov.size())) {
        // Error.

        // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
        // a bug that exists in both Clang and GCC:
        //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
        //   http://llvm.org/bugs/show_bug.cgi?id=12286
        goto error;
      }
    } else {
      struct msghdr msg;
      memset(&msg, 0, sizeof(msg));
      msg.msg_iov = iov.begin();
      msg.msg_iovlen = iov.size();

      // Allocate space to receive a cmsg.
      size_t msgBytes = CMSG_SPACE(sizeof(int) * fds.size());
Kenton Varda's avatar
Kenton Varda committed
504 505 506 507 508 509 510 511 512
      // On Linux, CMSG_SPACE will align to a word-size boundary, but on Mac it always aligns to a
      // 32-bit boundary. I guess aligning to 32 bits helps avoid the problem where you
      // surprisingly end up with space for two file descriptors when you only wanted one. However,
      // cmsghdr's preferred alignment is word-size (it contains a size_t). If we stack-allocate
      // the buffer, we need to make sure it is aligned properly (maybe not on x64, but maybe on
      // other platforms), so we want to allocate an array of words (we use void*). So... we use
      // CMSG_SPACE() and then additionally round up to deal with Mac.
      size_t msgWords = (msgBytes + sizeof(void*) - 1) / sizeof(void*);
      KJ_STACK_ARRAY(void*, cmsgSpace, msgWords, 16, 256);
513 514 515
      auto cmsgBytes = cmsgSpace.asBytes();
      memset(cmsgBytes.begin(), 0, cmsgBytes.size());
      msg.msg_control = cmsgBytes.begin();
516
      msg.msg_controllen = msgBytes;
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532

      struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
      cmsg->cmsg_level = SOL_SOCKET;
      cmsg->cmsg_type = SCM_RIGHTS;
      cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fds.size());
      memcpy(CMSG_DATA(cmsg), fds.begin(), fds.asBytes().size());

      KJ_NONBLOCKING_SYSCALL(n = ::sendmsg(fd, &msg, 0)) {
        // Error.

        // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
        // a bug that exists in both Clang and GCC:
        //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
        //   http://llvm.org/bugs/show_bug.cgi?id=12286
        goto error;
      }
Kenton Varda's avatar
Kenton Varda committed
533
    }
534

Kenton Varda's avatar
Kenton Varda committed
535 536 537
    if (false) {
    error:
      return kj::READY_NOW;
538 539
    }

540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
    if (n < 0) {
      // Got EAGAIN. Nothing was written.
      return observer.whenBecomesWritable().then([=]() {
        return writeInternal(firstPiece, morePieces, fds);
      });
    } else if (n == 0) {
      // Why would a sendmsg() with a non-empty message ever return 0 when writing to a stream
      // socket? If there's no room in the send buffer, it should fail with EAGAIN. If the
      // connection is closed, it should fail with EPIPE. Various documents and forum posts around
      // the internet claim this can happen but no one seems to know when. My guess is it can only
      // happen if we try to send an empty message -- which we didn't. So I think this is
      // impossible. If it is possible, we need to figure out how to correctly handle it, which
      // depends on what caused it.
      //
      // Note in particular that if 0 is a valid return here, and we sent an SCM_RIGHTS message,
      // we need to know whether the message was sent or not, in order to decide whether to retry
      // sending it!
      KJ_FAIL_ASSERT("non-empty sendmsg() returned 0");
    }

    // Non-zero bytes were written. This also implies that *all* FDs were written.
561 562 563 564

    // Discard all data that was written, then issue a new write for what's left (if any).
    for (;;) {
      if (n < firstPiece.size()) {
565
        // Only part of the first piece was consumed.  Wait for buffer space and then write again.
566
        firstPiece = firstPiece.slice(n, firstPiece.size());
567 568 569 570
        iovTotal -= n;

        if (iovTotal == 0) {
          // Oops, what actually happened is that we hit the IOV_MAX limit. Don't wait.
571
          return writeInternal(firstPiece, morePieces, nullptr);
572 573
        }

574 575 576
        // As with read(), we cannot assume that a short write() really means the write buffer is
        // full (see comments in the read path above). We have to write again.
        return writeInternal(firstPiece, morePieces, nullptr);
577 578
      } else if (morePieces.size() == 0) {
        // First piece was fully-consumed and there are no more pieces, so we're done.
579
        KJ_DASSERT(n == firstPiece.size(), n);
580 581 582 583
        return READY_NOW;
      } else {
        // First piece was fully consumed, so move on to the next piece.
        n -= firstPiece.size();
584
        iovTotal -= firstPiece.size();
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
        firstPiece = morePieces[0];
        morePieces = morePieces.slice(1, morePieces.size());
      }
    }
  }
};

// =======================================================================================

class SocketAddress {
public:
  SocketAddress(const void* sockaddr, uint len): addrlen(len) {
    KJ_REQUIRE(len <= sizeof(addr), "Sorry, your sockaddr is too big for me.");
    memcpy(&addr.generic, sockaddr, len);
  }

Kenton Varda's avatar
Kenton Varda committed
601 602 603 604 605 606 607 608 609 610 611 612
  bool operator<(const SocketAddress& other) const {
    // So we can use std::set<SocketAddress>...  see DNS lookup code.

    if (wildcard < other.wildcard) return true;
    if (wildcard > other.wildcard) return false;

    if (addrlen < other.addrlen) return true;
    if (addrlen > other.addrlen) return false;

    return memcmp(&addr.generic, &other.addr.generic, addrlen) < 0;
  }

613 614 615
  const struct sockaddr* getRaw() const { return &addr.generic; }
  socklen_t getRawSize() const { return addrlen; }

616
  int socket(int type) const {
617 618
    bool isStream = type == SOCK_STREAM;

619
    int result;
Kenton Varda's avatar
Kenton Varda committed
620
#if __linux__ && !__BIONIC__
621 622 623
    type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
    KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
624 625 626

    if (isStream && (addr.generic.sa_family == AF_INET ||
                     addr.generic.sa_family == AF_INET6)) {
627
      // TODO(perf):  As a hack for the 0.4 release we are always setting
628 629 630 631 632 633 634 635 636
      //   TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
      //   RPC protocol.  Later, we should extend the interface to provide more
      //   control over this.  Perhaps write() should have a flag which
      //   specifies whether to pass MSG_MORE.
      int one = 1;
      KJ_SYSCALL(setsockopt(
          result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
    }

637 638 639 640
    return result;
  }

  void bind(int sockfd) const {
641
#if !defined(__OpenBSD__)
642 643 644 645 646 647
    if (wildcard) {
      // Disable IPV6_V6ONLY because we want to handle both ipv4 and ipv6 on this socket.  (The
      // default value of this option varies across platforms.)
      int value = 0;
      KJ_SYSCALL(setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)));
    }
648
#endif
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670

    KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
  }

  uint getPort() const {
    switch (addr.generic.sa_family) {
      case AF_INET: return ntohs(addr.inet4.sin_port);
      case AF_INET6: return ntohs(addr.inet6.sin6_port);
      default: return 0;
    }
  }

  String toString() const {
    if (wildcard) {
      return str("*:", getPort());
    }

    switch (addr.generic.sa_family) {
      case AF_INET: {
        char buffer[INET6_ADDRSTRLEN];
        if (inet_ntop(addr.inet4.sin_family, &addr.inet4.sin_addr,
                      buffer, sizeof(buffer)) == nullptr) {
671 672
          KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
          return heapString("(inet_ntop error)");
673 674 675 676 677 678 679
        }
        return str(buffer, ':', ntohs(addr.inet4.sin_port));
      }
      case AF_INET6: {
        char buffer[INET6_ADDRSTRLEN];
        if (inet_ntop(addr.inet6.sin6_family, &addr.inet6.sin6_addr,
                      buffer, sizeof(buffer)) == nullptr) {
680 681
          KJ_FAIL_SYSCALL("inet_ntop", errno) { break; }
          return heapString("(inet_ntop error)");
682 683 684 685
        }
        return str('[', buffer, "]:", ntohs(addr.inet6.sin6_port));
      }
      case AF_UNIX: {
686 687 688
        auto path = _::safeUnixPath(&addr.unixDomain, addrlen);
        if (path.size() > 0 && path[0] == '\0') {
          return str("unix-abstract:", path.slice(1, path.size()));
689
        } else {
690
          return str("unix:", path);
691
        }
692 693 694 695 696 697
      }
      default:
        return str("(unknown address family ", addr.generic.sa_family, ")");
    }
  }

Kenton Varda's avatar
Kenton Varda committed
698
  static Promise<Array<SocketAddress>> lookupHost(
699 700
      LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
      _::NetworkFilter& filter);
Kenton Varda's avatar
Kenton Varda committed
701 702 703
  // Perform a DNS lookup.

  static Promise<Array<SocketAddress>> parse(
704
      LowLevelAsyncIoProvider& lowLevel, StringPtr str, uint portHint, _::NetworkFilter& filter) {
Kenton Varda's avatar
Kenton Varda committed
705 706
    // TODO(someday):  Allow commas in `str`.

707 708 709 710 711 712
    SocketAddress result;

    if (str.startsWith("unix:")) {
      StringPtr path = str.slice(strlen("unix:"));
      KJ_REQUIRE(path.size() < sizeof(addr.unixDomain.sun_path),
                 "Unix domain socket address is too long.", str);
713 714 715 716 717 718
      KJ_REQUIRE(path.size() == strlen(path.cStr()),
                 "Unix domain socket address contains NULL. Use"
                 " 'unix-abstract:' for the abstract namespace.");
      result.addr.unixDomain.sun_family = AF_UNIX;
      strcpy(result.addr.unixDomain.sun_path, path.cStr());
      result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
719 720 721 722 723 724

      if (!result.parseAllowedBy(filter)) {
        KJ_FAIL_REQUIRE("unix sockets blocked by restrictPeers()");
        return Array<SocketAddress>();
      }

725 726 727 728 729 730 731 732 733
      auto array = kj::heapArrayBuilder<SocketAddress>(1);
      array.add(result);
      return array.finish();
    }

    if (str.startsWith("unix-abstract:")) {
      StringPtr path = str.slice(strlen("unix-abstract:"));
      KJ_REQUIRE(path.size() + 1 < sizeof(addr.unixDomain.sun_path),
                 "Unix domain socket address is too long.", str);
734
      result.addr.unixDomain.sun_family = AF_UNIX;
735
      result.addr.unixDomain.sun_path[0] = '\0';
736 737 738
      // although not strictly required by Linux, also copy the trailing
      // NULL terminator so that we can safely read it back in toString
      memcpy(result.addr.unixDomain.sun_path + 1, path.cStr(), path.size() + 1);
739
      result.addrlen = offsetof(struct sockaddr_un, sun_path) + path.size() + 1;
740 741 742 743 744 745

      if (!result.parseAllowedBy(filter)) {
        KJ_FAIL_REQUIRE("abstract unix sockets blocked by restrictPeers()");
        return Array<SocketAddress>();
      }

Kenton Varda's avatar
Kenton Varda committed
746 747 748
      auto array = kj::heapArrayBuilder<SocketAddress>(1);
      array.add(result);
      return array.finish();
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
    }

    // Try to separate the address and port.
    ArrayPtr<const char> addrPart;
    Maybe<StringPtr> portPart;

    int af;

    if (str.startsWith("[")) {
      // Address starts with a bracket, which is a common way to write an ip6 address with a port,
      // since without brackets around the address part, the port looks like another segment of
      // the address.
      af = AF_INET6;
      size_t closeBracket = KJ_ASSERT_NONNULL(str.findLast(']'),
          "Unclosed '[' in address string.", str);

      addrPart = str.slice(1, closeBracket);
      if (str.size() > closeBracket + 1) {
        KJ_REQUIRE(str.slice(closeBracket + 1).startsWith(":"),
                   "Expected port suffix after ']'.", str);
        portPart = str.slice(closeBracket + 2);
      }
    } else {
      KJ_IF_MAYBE(colon, str.findFirst(':')) {
        if (str.slice(*colon + 1).findFirst(':') == nullptr) {
          // There is exactly one colon and no brackets, so it must be an ip4 address with port.
          af = AF_INET;
          addrPart = str.slice(0, *colon);
          portPart = str.slice(*colon + 1);
        } else {
          // There are two or more colons and no brackets, so the whole thing must be an ip6
          // address with no port.
          af = AF_INET6;
          addrPart = str;
        }
      } else {
        // No colons, so it must be an ip4 address without port.
        af = AF_INET;
        addrPart = str;
      }
    }

    // Parse the port.
    unsigned long port;
    KJ_IF_MAYBE(portText, portPart) {
      char* endptr;
      port = strtoul(portText->cStr(), &endptr, 0);
      if (portText->size() == 0 || *endptr != '\0') {
Kenton Varda's avatar
Kenton Varda committed
797
        // Not a number.  Maybe it's a service name.  Fall back to DNS.
798 799
        return lookupHost(lowLevel, kj::heapString(addrPart), kj::heapString(*portText), portHint,
                          filter);
800 801 802 803 804 805
      }
      KJ_REQUIRE(port < 65536, "Port number too large.");
    } else {
      port = portHint;
    }

Kenton Varda's avatar
Kenton Varda committed
806 807 808
    // Check for wildcard.
    if (addrPart.size() == 1 && addrPart[0] == '*') {
      result.wildcard = true;
809 810 811 812 813 814 815 816
#if defined(__OpenBSD__)
      // On OpenBSD, all sockets are either v4-only or v6-only, so use v4 as a
      // temporary workaround for wildcards.
      result.addrlen = sizeof(addr.inet4);
      result.addr.inet4.sin_family = AF_INET;
      result.addr.inet4.sin_port = htons(port);
#else
      // Create an ip6 socket and set IPV6_V6ONLY to 0 later.
Kenton Varda's avatar
Kenton Varda committed
817 818 819
      result.addrlen = sizeof(addr.inet6);
      result.addr.inet6.sin6_family = AF_INET6;
      result.addr.inet6.sin6_port = htons(port);
820
#endif
821

Kenton Varda's avatar
Kenton Varda committed
822 823 824 825 826
      auto array = kj::heapArrayBuilder<SocketAddress>(1);
      array.add(result);
      return array.finish();
    }

827 828 829 830 831 832 833 834 835 836 837 838 839
    void* addrTarget;
    if (af == AF_INET6) {
      result.addrlen = sizeof(addr.inet6);
      result.addr.inet6.sin6_family = AF_INET6;
      result.addr.inet6.sin6_port = htons(port);
      addrTarget = &result.addr.inet6.sin6_addr;
    } else {
      result.addrlen = sizeof(addr.inet4);
      result.addr.inet4.sin_family = AF_INET;
      result.addr.inet4.sin_port = htons(port);
      addrTarget = &result.addr.inet4.sin_addr;
    }

840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857
    if (addrPart.size() < INET6_ADDRSTRLEN - 1) {
      // addrPart is not necessarily NUL-terminated so we have to make a copy.  :(
      char buffer[INET6_ADDRSTRLEN];
      memcpy(buffer, addrPart.begin(), addrPart.size());
      buffer[addrPart.size()] = '\0';

      // OK, parse it!
      switch (inet_pton(af, buffer, addrTarget)) {
        case 1: {
          // success.
          if (!result.parseAllowedBy(filter)) {
            KJ_FAIL_REQUIRE("address family blocked by restrictPeers()");
            return Array<SocketAddress>();
          }

          auto array = kj::heapArrayBuilder<SocketAddress>(1);
          array.add(result);
          return array.finish();
858
        }
859 860 861 862 863
        case 0:
          // It's apparently not a simple address...  fall back to DNS.
          break;
        default:
          KJ_FAIL_SYSCALL("inet_pton", errno, af, addrPart);
Kenton Varda's avatar
Kenton Varda committed
864
      }
865
    }
866 867

    return lookupHost(lowLevel, kj::heapString(addrPart), nullptr, port, filter);
868 869 870 871 872 873 874 875 876
  }

  static SocketAddress getLocalAddress(int sockfd) {
    SocketAddress result;
    result.addrlen = sizeof(addr);
    KJ_SYSCALL(getsockname(sockfd, &result.addr.generic, &result.addrlen));
    return result;
  }

877 878 879 880
  bool allowedBy(LowLevelAsyncIoProvider::NetworkFilter& filter) {
    return filter.shouldAllow(&addr.generic, addrlen);
  }

881 882
  bool parseAllowedBy(_::NetworkFilter& filter) {
    return filter.shouldAllowParse(&addr.generic, addrlen);
883 884
  }

885
private:
Kenton Varda's avatar
Kenton Varda committed
886 887 888 889
  SocketAddress() {
    // We need to memset the whole object 0 otherwise Valgrind gets unhappy when we write it to a
    // pipe, due to the padding bytes being uninitialized.
    memset(this, 0, sizeof(*this));
890 891 892 893 894 895 896 897 898 899 900
  }

  socklen_t addrlen;
  bool wildcard = false;
  union {
    struct sockaddr generic;
    struct sockaddr_in inet4;
    struct sockaddr_in6 inet6;
    struct sockaddr_un unixDomain;
    struct sockaddr_storage storage;
  } addr;
Kenton Varda's avatar
Kenton Varda committed
901 902 903

  struct LookupParams;
  class LookupReader;
904 905
};

Kenton Varda's avatar
Kenton Varda committed
906 907 908
class SocketAddress::LookupReader {
  // Reads SocketAddresses off of a pipe coming from another thread that is performing
  // getaddrinfo.
909

Kenton Varda's avatar
Kenton Varda committed
910
public:
911 912 913
  LookupReader(kj::Own<Thread>&& thread, kj::Own<AsyncInputStream>&& input,
               _::NetworkFilter& filter)
      : thread(kj::mv(thread)), input(kj::mv(input)), filter(filter) {}
Kenton Varda's avatar
Kenton Varda committed
914 915 916 917 918 919 920 921 922 923 924 925

  ~LookupReader() {
    if (thread) thread->detach();
  }

  Promise<Array<SocketAddress>> read() {
    return input->tryRead(&current, sizeof(current), sizeof(current)).then(
        [this](size_t n) -> Promise<Array<SocketAddress>> {
      if (n < sizeof(current)) {
        thread = nullptr;
        // getaddrinfo()'s docs seem to say it will never return an empty list, but let's check
        // anyway.
926
        KJ_REQUIRE(addresses.size() > 0, "DNS lookup returned no permitted addresses.") { break; }
Kenton Varda's avatar
Kenton Varda committed
927 928 929 930 931 932 933 934 935 936 937 938
        return addresses.releaseAsArray();
      } else {
        // getaddrinfo() can return multiple copies of the same address for several reasons.
        // A major one is that we don't give it a socket type (SOCK_STREAM vs. SOCK_DGRAM), so
        // it may return two copies of the same address, one for each type, unless it explicitly
        // knows that the service name given is specific to one type.  But we can't tell it a type,
        // because we don't actually know which one the user wants, and if we specify SOCK_STREAM
        // while the user specified a UDP service name then they'll get a resolution error which
        // is lame.  (At least, I think that's how it works.)
        //
        // So we instead resort to de-duping results.
        if (alreadySeen.insert(current).second) {
939 940 941
          if (current.parseAllowedBy(filter)) {
            addresses.add(current);
          }
Kenton Varda's avatar
Kenton Varda committed
942 943 944 945 946 947 948 949 950
        }
        return read();
      }
    });
  }

private:
  kj::Own<Thread> thread;
  kj::Own<AsyncInputStream> input;
951
  _::NetworkFilter& filter;
Kenton Varda's avatar
Kenton Varda committed
952 953 954 955 956 957 958 959 960 961 962
  SocketAddress current;
  kj::Vector<SocketAddress> addresses;
  std::set<SocketAddress> alreadySeen;
};

struct SocketAddress::LookupParams {
  kj::String host;
  kj::String service;
};

Promise<Array<SocketAddress>> SocketAddress::lookupHost(
963 964
    LowLevelAsyncIoProvider& lowLevel, kj::String host, kj::String service, uint portHint,
    _::NetworkFilter& filter) {
Kenton Varda's avatar
Kenton Varda committed
965 966 967 968 969 970 971 972
  // This shitty function spawns a thread to run getaddrinfo().  Unfortunately, getaddrinfo() is
  // the only cross-platform DNS API and it is blocking.
  //
  // TODO(perf):  Use a thread pool?  Maybe kj::Thread should use a thread pool automatically?
  //   Maybe use the various platform-specific asynchronous DNS libraries?  Please do not implement
  //   a custom DNS resolver...

  int fds[2];
Kenton Varda's avatar
Kenton Varda committed
973
#if __linux__ && !__BIONIC__
Kenton Varda's avatar
Kenton Varda committed
974 975 976
  KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
#else
  KJ_SYSCALL(pipe(fds));
977
#endif
Kenton Varda's avatar
Kenton Varda committed
978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031

  auto input = lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS);

  int outFd = fds[1];

  LookupParams params = { kj::mv(host), kj::mv(service) };

  auto thread = heap<Thread>(kj::mvCapture(params, [outFd,portHint](LookupParams&& params) {
    FdOutputStream output((AutoCloseFd(outFd)));

    struct addrinfo* list;
    int status = getaddrinfo(
        params.host == "*" ? nullptr : params.host.cStr(),
        params.service == nullptr ? nullptr : params.service.cStr(),
        nullptr, &list);
    if (status == 0) {
      KJ_DEFER(freeaddrinfo(list));

      struct addrinfo* cur = list;
      while (cur != nullptr) {
        if (params.service == nullptr) {
          switch (cur->ai_addr->sa_family) {
            case AF_INET:
              ((struct sockaddr_in*)cur->ai_addr)->sin_port = htons(portHint);
              break;
            case AF_INET6:
              ((struct sockaddr_in6*)cur->ai_addr)->sin6_port = htons(portHint);
              break;
            default:
              break;
          }
        }

        SocketAddress addr;
        if (params.host == "*") {
          // Set up a wildcard SocketAddress.  Only use the port number returned by getaddrinfo().
          addr.wildcard = true;
          addr.addrlen = sizeof(addr.addr.inet6);
          addr.addr.inet6.sin6_family = AF_INET6;
          switch (cur->ai_addr->sa_family) {
            case AF_INET:
              addr.addr.inet6.sin6_port = ((struct sockaddr_in*)cur->ai_addr)->sin_port;
              break;
            case AF_INET6:
              addr.addr.inet6.sin6_port = ((struct sockaddr_in6*)cur->ai_addr)->sin6_port;
              break;
            default:
              addr.addr.inet6.sin6_port = portHint;
              break;
          }
        } else {
          addr.addrlen = cur->ai_addrlen;
          memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
        }
1032
        KJ_ASSERT_CAN_MEMCPY(SocketAddress);
Kenton Varda's avatar
Kenton Varda committed
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
        output.write(&addr, sizeof(addr));
        cur = cur->ai_next;
      }
    } else if (status == EAI_SYSTEM) {
      KJ_FAIL_SYSCALL("getaddrinfo", errno, params.host, params.service) {
        return;
      }
    } else {
      KJ_FAIL_REQUIRE("DNS lookup failed.",
                      params.host, params.service, gai_strerror(status)) {
        return;
      }
    }
  }));

1048
  auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input), filter);
1049
  return reader->read().attach(kj::mv(reader));
Kenton Varda's avatar
Kenton Varda committed
1050 1051 1052
}

// =======================================================================================
1053

1054 1055
class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
public:
1056 1057 1058
  FdConnectionReceiver(UnixEventPort& eventPort, int fd,
                       LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
      : OwnedFileDescriptor(fd, flags), eventPort(eventPort), filter(filter),
1059
        observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
1060 1061 1062 1063

  Promise<Own<AsyncIoStream>> accept() override {
    int newFd;

1064 1065 1066
    struct sockaddr_storage addr;
    socklen_t addrlen = sizeof(addr);

1067
  retry:
Kenton Varda's avatar
Kenton Varda committed
1068
#if __linux__ && !__BIONIC__
1069 1070
    newFd = ::accept4(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen,
                      SOCK_NONBLOCK | SOCK_CLOEXEC);
1071
#else
1072
    newFd = ::accept(fd, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
1073 1074
#endif

1075
    if (newFd >= 0) {
1076 1077 1078 1079 1080 1081 1082
      if (!filter.shouldAllow(reinterpret_cast<struct sockaddr*>(&addr), addrlen)) {
        // Drop disallowed address.
        close(newFd);
        return accept();
      } else {
        return Own<AsyncIoStream>(heap<AsyncStreamFd>(eventPort, newFd, NEW_FD_FLAGS));
      }
1083 1084 1085 1086 1087 1088 1089 1090 1091
    } else {
      int error = errno;

      switch (error) {
        case EAGAIN:
#if EAGAIN != EWOULDBLOCK
        case EWOULDBLOCK:
#endif
          // Not ready yet.
1092
          return observer.whenBecomesReadable().then([this]() {
1093 1094 1095 1096 1097
            return accept();
          });

        case EINTR:
        case ENETDOWN:
1098 1099
#ifdef EPROTO
        // EPROTO is not defined on OpenBSD.
1100
        case EPROTO:
1101
#endif
1102 1103 1104 1105
        case EHOSTDOWN:
        case EHOSTUNREACH:
        case ENETUNREACH:
        case ECONNABORTED:
Kenton Varda's avatar
Kenton Varda committed
1106 1107 1108 1109 1110
        case ETIMEDOUT:
          // According to the Linux man page, accept() may report an error if the accepted
          // connection is already broken.  In this case, we really ought to just ignore it and
          // keep waiting.  But it's hard to say exactly what errors are such network errors and
          // which ones are permanent errors.  We've made a guess here.
1111 1112 1113 1114 1115 1116
          goto retry;

        default:
          KJ_FAIL_SYSCALL("accept", error);
      }

1117 1118 1119 1120 1121 1122
    }
  }

  uint getPort() override {
    return SocketAddress::getLocalAddress(fd).getPort();
  }
1123

1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
  void getsockopt(int level, int option, void* value, uint* length) override {
    socklen_t socklen = *length;
    KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
    *length = socklen;
  }
  void setsockopt(int level, int option, const void* value, uint length) override {
    KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
  }

public:
  UnixEventPort& eventPort;
1135
  LowLevelAsyncIoProvider::NetworkFilter& filter;
1136 1137 1138 1139 1140
  UnixEventPort::FdObserver observer;
};

class DatagramPortImpl final: public DatagramPort, public OwnedFileDescriptor {
public:
1141 1142 1143
  DatagramPortImpl(LowLevelAsyncIoProvider& lowLevel, UnixEventPort& eventPort, int fd,
                   LowLevelAsyncIoProvider::NetworkFilter& filter, uint flags)
      : OwnedFileDescriptor(fd, flags), lowLevel(lowLevel), eventPort(eventPort), filter(filter),
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
        observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ |
                                UnixEventPort::FdObserver::OBSERVE_WRITE) {}

  Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) override;
  Promise<size_t> send(
      ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) override;

  class ReceiverImpl;

  Own<DatagramReceiver> makeReceiver(DatagramReceiver::Capacity capacity) override;

  uint getPort() override {
    return SocketAddress::getLocalAddress(fd).getPort();
  }

  void getsockopt(int level, int option, void* value, uint* length) override {
    socklen_t socklen = *length;
    KJ_SYSCALL(::getsockopt(fd, level, option, value, &socklen));
    *length = socklen;
  }
  void setsockopt(int level, int option, const void* value, uint length) override {
    KJ_SYSCALL(::setsockopt(fd, level, option, value, length));
  }

1168
public:
1169
  LowLevelAsyncIoProvider& lowLevel;
1170
  UnixEventPort& eventPort;
1171
  LowLevelAsyncIoProvider::NetworkFilter& filter;
1172
  UnixEventPort::FdObserver observer;
1173 1174
};

1175 1176
class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
public:
1177
  LowLevelAsyncIoProviderImpl()
1178
      : eventLoop(eventPort), waitScope(eventLoop) {}
1179 1180

  inline WaitScope& getWaitScope() { return waitScope; }
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190

  Own<AsyncInputStream> wrapInputFd(int fd, uint flags = 0) override {
    return heap<AsyncStreamFd>(eventPort, fd, flags);
  }
  Own<AsyncOutputStream> wrapOutputFd(int fd, uint flags = 0) override {
    return heap<AsyncStreamFd>(eventPort, fd, flags);
  }
  Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
    return heap<AsyncStreamFd>(eventPort, fd, flags);
  }
1191 1192 1193
  Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0) override {
    return heap<AsyncStreamFd>(eventPort, fd, flags);
  }
1194 1195
  Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
      int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
1196 1197 1198 1199
    // It's important that we construct the AsyncStreamFd first, so that `flags` are honored,
    // especially setting nonblocking mode and taking ownership.
    auto result = heap<AsyncStreamFd>(eventPort, fd, flags);

1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
    // Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
    // non-blocking using EINPROGRESS.
    for (;;) {
      if (::connect(fd, addr, addrlen) < 0) {
        int error = errno;
        if (error == EINPROGRESS) {
          // Fine.
          break;
        } else if (error != EINTR) {
          KJ_FAIL_SYSCALL("connect()", error) { break; }
          return Own<AsyncIoStream>();
        }
      } else {
        // no error
        break;
      }
    }

1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
    auto connected = result->waitConnected();
    return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
      int err;
      socklen_t errlen = sizeof(err);
      KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
      if (err != 0) {
        KJ_FAIL_SYSCALL("connect()", err) { break; }
      }
      return kj::mv(stream);
    }));
1228
  }
1229 1230 1231
  Own<ConnectionReceiver> wrapListenSocketFd(
      int fd, NetworkFilter& filter, uint flags = 0) override {
    return heap<FdConnectionReceiver>(eventPort, fd, filter, flags);
1232
  }
1233 1234 1235
  Own<DatagramPort> wrapDatagramSocketFd(
      int fd, NetworkFilter& filter, uint flags = 0) override {
    return heap<DatagramPortImpl>(*this, eventPort, fd, filter, flags);
1236
  }
1237

1238
  Timer& getTimer() override { return eventPort.getTimer(); }
1239

1240 1241
  UnixEventPort& getEventPort() { return eventPort; }

1242 1243 1244
private:
  UnixEventPort eventPort;
  EventLoop eventLoop;
1245
  WaitScope waitScope;
1246 1247
};

1248 1249
// =======================================================================================

Kenton Varda's avatar
Kenton Varda committed
1250
class NetworkAddressImpl final: public NetworkAddress {
1251
public:
1252 1253 1254 1255
  NetworkAddressImpl(LowLevelAsyncIoProvider& lowLevel,
                     LowLevelAsyncIoProvider::NetworkFilter& filter,
                     Array<SocketAddress> addrs)
      : lowLevel(lowLevel), filter(filter), addrs(kj::mv(addrs)) {}
Kenton Varda's avatar
Kenton Varda committed
1256 1257

  Promise<Own<AsyncIoStream>> connect() override {
1258
    auto addrsCopy = heapArray(addrs.asPtr());
1259
    auto promise = connectImpl(lowLevel, filter, addrsCopy);
1260
    return promise.attach(kj::mv(addrsCopy));
Kenton Varda's avatar
Kenton Varda committed
1261
  }
1262 1263

  Own<ConnectionReceiver> listen() override {
Kenton Varda's avatar
Kenton Varda committed
1264 1265 1266 1267 1268
    if (addrs.size() > 1) {
      KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
          "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
          "in the future.", addrs[0].toString());
    }
Kenton Varda's avatar
Kenton Varda committed
1269 1270

    int fd = addrs[0].socket(SOCK_STREAM);
1271

1272 1273 1274 1275 1276 1277 1278
    {
      KJ_ON_SCOPE_FAILURE(close(fd));

      // We always enable SO_REUSEADDR because having to take your server down for five minutes
      // before it can restart really sucks.
      int optval = 1;
      KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
1279

Kenton Varda's avatar
Kenton Varda committed
1280
      addrs[0].bind(fd);
1281

1282 1283 1284
      // TODO(someday):  Let queue size be specified explicitly in string addresses.
      KJ_SYSCALL(::listen(fd, SOMAXCONN));
    }
1285

1286
    return lowLevel.wrapListenSocketFd(fd, filter, NEW_FD_FLAGS);
1287 1288
  }

1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
  Own<DatagramPort> bindDatagramPort() override {
    if (addrs.size() > 1) {
      KJ_LOG(WARNING, "Bind address resolved to multiple addresses.  Only the first address will "
          "be used.  If this is incorrect, specify the address numerically.  This may be fixed "
          "in the future.", addrs[0].toString());
    }

    int fd = addrs[0].socket(SOCK_DGRAM);

    {
      KJ_ON_SCOPE_FAILURE(close(fd));

      // We always enable SO_REUSEADDR because having to take your server down for five minutes
      // before it can restart really sucks.
      int optval = 1;
      KJ_SYSCALL(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));

      addrs[0].bind(fd);
    }

1309
    return lowLevel.wrapDatagramSocketFd(fd, filter, NEW_FD_FLAGS);
1310 1311 1312
  }

  Own<NetworkAddress> clone() override {
1313
    return kj::heap<NetworkAddressImpl>(lowLevel, filter, kj::heapArray(addrs.asPtr()));
1314 1315
  }

1316
  String toString() override {
Kenton Varda's avatar
Kenton Varda committed
1317
    return strArray(KJ_MAP(addr, addrs) { return addr.toString(); }, ",");
1318 1319
  }

1320 1321 1322 1323 1324
  const SocketAddress& chooseOneAddress() {
    KJ_REQUIRE(addrs.size() > 0, "No addresses available.");
    return addrs[counter++ % addrs.size()];
  }

1325
private:
1326
  LowLevelAsyncIoProvider& lowLevel;
1327
  LowLevelAsyncIoProvider::NetworkFilter& filter;
Kenton Varda's avatar
Kenton Varda committed
1328
  Array<SocketAddress> addrs;
1329
  uint counter = 0;
1330

1331
  static Promise<Own<AsyncIoStream>> connectImpl(
1332 1333 1334
      LowLevelAsyncIoProvider& lowLevel,
      LowLevelAsyncIoProvider::NetworkFilter& filter,
      ArrayPtr<SocketAddress> addrs) {
1335
    KJ_ASSERT(addrs.size() > 0);
1336

1337 1338 1339 1340
    return kj::evalNow([&]() -> Promise<Own<AsyncIoStream>> {
      if (!addrs[0].allowedBy(filter)) {
        return KJ_EXCEPTION(FAILED, "connect() blocked by restrictPeers()");
      } else {
1341
        int fd = addrs[0].socket(SOCK_STREAM);
1342 1343 1344
        return lowLevel.wrapConnectingSocketFd(
            fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
      }
1345
    }).then([](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> {
Kenton Varda's avatar
Kenton Varda committed
1346 1347
      // Success, pass along.
      return kj::mv(stream);
1348
    }, [&lowLevel,&filter,addrs](Exception&& exception) mutable -> Promise<Own<AsyncIoStream>> {
Kenton Varda's avatar
Kenton Varda committed
1349
      // Connect failed.
1350
      if (addrs.size() > 1) {
Kenton Varda's avatar
Kenton Varda committed
1351
        // Try the next address instead.
1352
        return connectImpl(lowLevel, filter, addrs.slice(1, addrs.size()));
Kenton Varda's avatar
Kenton Varda committed
1353 1354 1355 1356 1357
      } else {
        // No more addresses to try, so propagate the exception.
        return kj::mv(exception);
      }
    });
1358 1359 1360 1361 1362
  }
};

class SocketNetwork final: public Network {
public:
1363
  explicit SocketNetwork(LowLevelAsyncIoProvider& lowLevel): lowLevel(lowLevel) {}
1364 1365 1366 1367
  explicit SocketNetwork(SocketNetwork& parent,
                         kj::ArrayPtr<const kj::StringPtr> allow,
                         kj::ArrayPtr<const kj::StringPtr> deny)
      : lowLevel(parent.lowLevel), filter(allow, deny, parent.filter) {}
1368

Kenton Varda's avatar
Kenton Varda committed
1369
  Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
1370 1371 1372 1373
    return evalLater(mvCapture(heapString(addr), [this,portHint](String&& addr) {
      return SocketAddress::parse(lowLevel, addr, portHint, filter);
    })).then([this](Array<SocketAddress> addresses) -> Own<NetworkAddress> {
      return heap<NetworkAddressImpl>(lowLevel, filter, kj::mv(addresses));
Kenton Varda's avatar
Kenton Varda committed
1374
    });
1375 1376
  }

Kenton Varda's avatar
Kenton Varda committed
1377 1378 1379
  Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
    auto array = kj::heapArrayBuilder<SocketAddress>(1);
    array.add(SocketAddress(sockaddr, len));
1380 1381 1382 1383 1384 1385 1386 1387
    KJ_REQUIRE(array[0].allowedBy(filter), "address blocked by restrictPeers()") { break; }
    return Own<NetworkAddress>(heap<NetworkAddressImpl>(lowLevel, filter, array.finish()));
  }

  Own<Network> restrictPeers(
      kj::ArrayPtr<const kj::StringPtr> allow,
      kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
    return heap<SocketNetwork>(*this, allow, deny);
1388 1389
  }

1390
private:
1391
  LowLevelAsyncIoProvider& lowLevel;
1392
  _::NetworkFilter filter;
1393
};
1394

1395
// =======================================================================================
1396

1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
Promise<size_t> DatagramPortImpl::send(
    const void* buffer, size_t size, NetworkAddress& destination) {
  auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();

  ssize_t n;
  KJ_NONBLOCKING_SYSCALL(n = sendto(fd, buffer, size, 0, addr.getRaw(), addr.getRawSize()));
  if (n < 0) {
    // Write buffer full.
    return observer.whenBecomesWritable().then([this, buffer, size, &destination]() {
      return send(buffer, size, destination);
    });
  } else {
    // If less than the whole message was sent, then it got truncated, and there's nothing we can
    // do about it.
    return n;
  }
}

Promise<size_t> DatagramPortImpl::send(
    ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) {
  struct msghdr msg;
  memset(&msg, 0, sizeof(msg));

  auto& addr = downcast<NetworkAddressImpl>(destination).chooseOneAddress();
  msg.msg_name = const_cast<void*>(implicitCast<const void*>(addr.getRaw()));
  msg.msg_namelen = addr.getRawSize();

Tom Lee's avatar
Tom Lee committed
1424
  const size_t iovmax = kj::miniposix::iovMax(pieces.size());
1425
  KJ_STACK_ARRAY(struct iovec, iov, kj::min(pieces.size(), iovmax), 16, 64);
1426 1427 1428 1429 1430 1431 1432

  for (size_t i: kj::indices(pieces)) {
    iov[i].iov_base = const_cast<void*>(implicitCast<const void*>(pieces[i].begin()));
    iov[i].iov_len = pieces[i].size();
  }

  Array<byte> extra;
1433
  if (pieces.size() > iovmax) {
1434 1435 1436 1437 1438
    // Too many pieces, but we can't use multiple syscalls because they'd send separate
    // datagrams. We'll have to copy the trailing pieces into a temporary array.
    //
    // TODO(perf): On Linux we could use multiple syscalls via MSG_MORE.
    size_t extraSize = 0;
1439
    for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1440 1441 1442 1443
      extraSize += pieces[i].size();
    }
    extra = kj::heapArray<byte>(extraSize);
    extraSize = 0;
1444
    for (size_t i = iovmax - 1; i < pieces.size(); i++) {
1445 1446 1447
      memcpy(extra.begin() + extraSize, pieces[i].begin(), pieces[i].size());
      extraSize += pieces[i].size();
    }
1448 1449
    iov[iovmax - 1].iov_base = extra.begin();
    iov[iovmax - 1].iov_len = extra.size();
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
  }

  msg.msg_iov = iov.begin();
  msg.msg_iovlen = iov.size();

  ssize_t n;
  KJ_NONBLOCKING_SYSCALL(n = sendmsg(fd, &msg, 0));
  if (n < 0) {
    // Write buffer full.
    return observer.whenBecomesWritable().then([this, pieces, &destination]() {
      return send(pieces, destination);
    });
  } else {
    // If less than the whole message was sent, then it was truncated, and there's nothing we can
    // do about that now.
    return n;
  }
}

class DatagramPortImpl::ReceiverImpl final: public DatagramReceiver {
public:
  explicit ReceiverImpl(DatagramPortImpl& port, Capacity capacity)
      : port(port),
        contentBuffer(heapArray<byte>(capacity.content)),
        ancillaryBuffer(capacity.ancillary > 0 ? heapArray<byte>(capacity.ancillary)
                                               : Array<byte>(nullptr)) {}

  Promise<void> receive() override {
    struct msghdr msg;
    memset(&msg, 0, sizeof(msg));

    struct sockaddr_storage addr;
    memset(&addr, 0, sizeof(addr));
    msg.msg_name = &addr;
    msg.msg_namelen = sizeof(addr);

    struct iovec iov;
    iov.iov_base = contentBuffer.begin();
    iov.iov_len = contentBuffer.size();
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    msg.msg_control = ancillaryBuffer.begin();
    msg.msg_controllen = ancillaryBuffer.size();

    ssize_t n;
    KJ_NONBLOCKING_SYSCALL(n = recvmsg(port.fd, &msg, 0));

    if (n < 0) {
      // No data available. Wait.
      return port.observer.whenBecomesReadable().then([this]() {
        return receive();
      });
    } else {
1503 1504 1505 1506 1507 1508
      if (!port.filter.shouldAllow(reinterpret_cast<const struct sockaddr*>(msg.msg_name),
                                   msg.msg_namelen)) {
        // Ignore message from disallowed source.
        return receive();
      }

1509 1510 1511
      receivedSize = n;
      contentTruncated = msg.msg_flags & MSG_TRUNC;

1512
      source.emplace(port.lowLevel, port.filter, msg.msg_name, msg.msg_namelen);
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534

      ancillaryList.resize(0);
      ancillaryTruncated = msg.msg_flags & MSG_CTRUNC;

      for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
           cmsg = CMSG_NXTHDR(&msg, cmsg)) {
        // On some platforms (OSX), a cmsghdr's length may cross the end of the ancillary buffer
        // when truncated. On other platforms (Linux) the length in cmsghdr will itself be
        // truncated to fit within the buffer.

        const byte* pos = reinterpret_cast<const byte*>(cmsg);
        size_t available = ancillaryBuffer.end() - pos;
        if (available < CMSG_SPACE(0)) {
          // The buffer ends in the middle of the header. We can't use this message.
          // (On Linux, this never happens, because the message is not included if there isn't
          // space for a header. I'm not sure how other systems behave, though, so let's be safe.)
          break;
        }

        // OK, we know the cmsghdr is valid, at least.

        // Find the start of the message payload.
1535
        const byte* begin = (const byte *)CMSG_DATA(cmsg);
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569

        // Cap the message length to the available space.
        const byte* end = pos + kj::min(available, cmsg->cmsg_len);

        ancillaryList.add(AncillaryMessage(
            cmsg->cmsg_level, cmsg->cmsg_type, arrayPtr(begin, end)));
      }

      return READY_NOW;
    }
  }

  MaybeTruncated<ArrayPtr<const byte>> getContent() override {
    return { contentBuffer.slice(0, receivedSize), contentTruncated };
  }

  MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() override {
    return { ancillaryList.asPtr(), ancillaryTruncated };
  }

  NetworkAddress& getSource() override {
    return KJ_REQUIRE_NONNULL(source, "Haven't sent a message yet.").abstract;
  }

private:
  DatagramPortImpl& port;
  Array<byte> contentBuffer;
  Array<byte> ancillaryBuffer;
  Vector<AncillaryMessage> ancillaryList;
  size_t receivedSize = 0;
  bool contentTruncated = false;
  bool ancillaryTruncated = false;

  struct StoredAddress {
1570 1571
    StoredAddress(LowLevelAsyncIoProvider& lowLevel, LowLevelAsyncIoProvider::NetworkFilter& filter,
                  const void* sockaddr, uint length)
1572
        : raw(sockaddr, length),
1573
          abstract(lowLevel, filter, Array<SocketAddress>(&raw, 1, NullArrayDisposer::instance)) {}
1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587

    SocketAddress raw;
    NetworkAddressImpl abstract;
  };

  kj::Maybe<StoredAddress> source;
};

Own<DatagramReceiver> DatagramPortImpl::makeReceiver(DatagramReceiver::Capacity capacity) {
  return kj::heap<ReceiverImpl>(*this, capacity);
}

// =======================================================================================

1588
class AsyncIoProviderImpl final: public AsyncIoProvider {
1589
public:
1590 1591
  AsyncIoProviderImpl(LowLevelAsyncIoProvider& lowLevel)
      : lowLevel(lowLevel), network(lowLevel) {}
Kenton Varda's avatar
Kenton Varda committed
1592

1593 1594
  OneWayPipe newOneWayPipe() override {
    int fds[2];
Kenton Varda's avatar
Kenton Varda committed
1595
#if __linux__ && !__BIONIC__
1596
    KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
1597
#else
1598
    KJ_SYSCALL(pipe(fds));
1599
#endif
1600 1601 1602 1603
    return OneWayPipe {
      lowLevel.wrapInputFd(fds[0], NEW_FD_FLAGS),
      lowLevel.wrapOutputFd(fds[1], NEW_FD_FLAGS)
    };
1604
  }
1605

1606 1607 1608
  TwoWayPipe newTwoWayPipe() override {
    int fds[2];
    int type = SOCK_STREAM;
Kenton Varda's avatar
Kenton Varda committed
1609
#if __linux__ && !__BIONIC__
1610
    type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
1611
#endif
1612
    KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
1613 1614 1615 1616
    return TwoWayPipe { {
      lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS),
      lowLevel.wrapSocketFd(fds[1], NEW_FD_FLAGS)
    } };
1617 1618
  }

1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
  CapabilityPipe newCapabilityPipe() override {
    int fds[2];
    int type = SOCK_STREAM;
#if __linux__ && !__BIONIC__
    type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
    KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
    return CapabilityPipe { {
      lowLevel.wrapUnixSocketFd(fds[0], NEW_FD_FLAGS),
      lowLevel.wrapUnixSocketFd(fds[1], NEW_FD_FLAGS)
    } };
  }

1632 1633 1634 1635
  Network& getNetwork() override {
    return network;
  }

1636
  PipeThread newPipeThread(
1637
      Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) override {
1638 1639
    int fds[2];
    int type = SOCK_STREAM;
Kenton Varda's avatar
Kenton Varda committed
1640
#if __linux__ && !__BIONIC__
1641 1642 1643 1644 1645
    type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
    KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));

    int threadFd = fds[1];
1646
    KJ_ON_SCOPE_FAILURE(close(threadFd));
1647

1648 1649 1650
    auto pipe = lowLevel.wrapSocketFd(fds[0], NEW_FD_FLAGS);

    auto thread = heap<Thread>(kj::mvCapture(startFunc,
1651
        [threadFd](Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)>&& startFunc) {
1652 1653 1654
      LowLevelAsyncIoProviderImpl lowLevel;
      auto stream = lowLevel.wrapSocketFd(threadFd, NEW_FD_FLAGS);
      AsyncIoProviderImpl ioProvider(lowLevel);
1655
      startFunc(ioProvider, *stream, lowLevel.getWaitScope());
1656
    }));
1657

1658
    return { kj::mv(thread), kj::mv(pipe) };
1659
  }
1660

1661 1662
  Timer& getTimer() override { return lowLevel.getTimer(); }

1663
private:
1664
  LowLevelAsyncIoProvider& lowLevel;
1665 1666 1667 1668 1669
  SocketNetwork network;
};

}  // namespace

1670 1671 1672 1673 1674 1675 1676
Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel) {
  return kj::heap<AsyncIoProviderImpl>(lowLevel);
}

AsyncIoContext setupAsyncIo() {
  auto lowLevel = heap<LowLevelAsyncIoProviderImpl>();
  auto ioProvider = kj::heap<AsyncIoProviderImpl>(*lowLevel);
1677
  auto& waitScope = lowLevel->getWaitScope();
1678 1679
  auto& eventPort = lowLevel->getEventPort();
  return { kj::mv(lowLevel), kj::mv(ioProvider), waitScope, eventPort };
1680 1681
}

1682
}  // namespace kj
1683 1684

#endif  // !_WIN32