// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

#ifndef KJ_ASYNC_IO_H_
#define KJ_ASYNC_IO_H_

#if defined(__GNUC__) && !KJ_HEADER_WARNINGS
#pragma GCC system_header
#endif

#include "async.h"
#include "function.h"
#include "thread.h"
#include "timer.h"

struct sockaddr;

namespace kj {

#if _WIN32
class Win32EventPort;
class AutoCloseHandle;
#else
class UnixEventPort;
#endif

class AutoCloseFd;
class NetworkAddress;
class AsyncOutputStream;
class AsyncIoStream;

// =======================================================================================
// Streaming I/O

class AsyncInputStream {
  // Asynchronous equivalent of InputStream (from io.h).

public:
  virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes);
  virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;

  Promise<void> read(void* buffer, size_t bytes);

  virtual Maybe<uint64_t> tryGetLength();
  // Get the remaining number of bytes that will be produced by this stream, if known.
  //
  // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
  // HTTP implementation may need to fall back to Transfer-Encoding: chunked.
  //
  // The default implementation always returns null.

  virtual Promise<uint64_t> pumpTo(
      AsyncOutputStream& output, uint64_t amount = kj::maxValue);
  // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
  // total bytes actually pumped (which is only less than `amount` if EOF was reached).
  //
  // Override this if your stream type knows how to pump itself to certain kinds of output
  // streams more efficiently than via the naive approach. You can use
  // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
  // delegate to the default implementation.
  //
  // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
  // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.

  Promise<Array<byte>> readAllBytes();
  Promise<String> readAllText();
  // Read until EOF and return as one big byte array or string.
};

class AsyncOutputStream {
  // Asynchronous equivalent of OutputStream (from io.h).

public:
  virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0;
  virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces)
      KJ_WARN_UNUSED_RESULT = 0;

  virtual Maybe<Promise<uint64_t>> tryPumpFrom(
      AsyncInputStream& input, uint64_t amount = kj::maxValue);
  // Implements double-dispatch for AsyncInputStream::pumpTo().
  //
  // This method should only be called from within an implementation of pumpTo().
  //
  // This method examines the type of `input` to find optimized ways to pump data from it to this
  // output stream. If it finds one, it performs the pump. Otherwise, it returns null.
  //
  // The default implementation always returns null.
};

class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
  // A combination input and output stream.

public:
  virtual void shutdownWrite() = 0;
  // Cleanly shut down just the write end of the stream, while keeping the read end open.

  virtual void abortRead() {}
  // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only
  // be called when an error has occurred.

  virtual void getsockopt(int level, int option, void* value, uint* length);
  virtual void setsockopt(int level, int option, const void* value, uint length);
  // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
  // if the stream is not a socket or the option is not appropriate for the socket type. The
  // default implementations always throw "unimplemented".

  virtual void getsockname(struct sockaddr* addr, uint* length);
  virtual void getpeername(struct sockaddr* addr, uint* length);
  // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented"
  // exception if the stream is not a socket. The default implementations always throw
  // "unimplemented".
  //
  // Note that we don't provide methods that return NetworkAddress because it usually wouldn't
  // be useful. You can't connect() to or listen() on these addresses, obviously, because they are
  // ephemeral addresses for a single connection.
};

class AsyncCapabilityStream: public AsyncIoStream {
  // An AsyncIoStream that also allows sending and receiving new connections or other kinds of
  // capabilities, in addition to simple data.
  //
  // For correct functioning, a protocol must be designed such that the receiver knows when to
  // expect a capability transfer. The receiver must not read() when a capability is expected, and
  // must not receiveStream() when data is expected -- if it does, an exception may be thrown or
  // invalid data may be returned. This implies that data sent over an AsyncCapabilityStream must
  // be framed such that the receiver knows exactly how many bytes to read before receiving a
  // capability.
  //
  // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor
  // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally
  // read()s when it should have called receiveStream(), it will observe a NUL byte in the data
  // and the capability will be discarded. Of course, an application should not depend on this
  // behavior; it should avoid read()ing through a capability.
  //
  // KJ does not provide any implementation of this type on Windows, as there's no obvious
  // implementation there. Handle passing on Windows requires at least one of the processes
  // involved to have permission to modify the other's handle table, which is effectively full
  // control. Handle passing between mutually non-trusting processes would require a trusted
  // broker process to facilitate. One could possibly implement this type in terms of such a
  // broker, or in terms of direct handle passing if at least one process trusts the other.

public:
  Promise<Own<AsyncCapabilityStream>> receiveStream();
  virtual Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream() = 0;
  virtual Promise<void> sendStream(Own<AsyncCapabilityStream> stream) = 0;
  // Transfer a stream.

  Promise<AutoCloseFd> receiveFd();
  virtual Promise<Maybe<AutoCloseFd>> tryReceiveFd();
  virtual Promise<void> sendFd(int fd);
  // Transfer a raw file descriptor. Default implementation throws UNIMPLEMENTED.
};

struct OneWayPipe {
  // A data pipe with an input end and an output end.  (Typically backed by pipe() system call.)

  Own<AsyncInputStream> in;
  Own<AsyncOutputStream> out;
};

struct TwoWayPipe {
  // A data pipe that supports sending in both directions.  Each end's output sends data to the
  // other end's input.  (Typically backed by socketpair() system call.)

  Own<AsyncIoStream> ends[2];
};

struct CapabilityPipe {
  // Like TwoWayPipe but allowing capability-passing.

  Own<AsyncCapabilityStream> ends[2];
};

class ConnectionReceiver {
  // Represents a server socket listening on a port.

public:
  virtual Promise<Own<AsyncIoStream>> accept() = 0;
  // Accept the next incoming connection.

  virtual uint getPort() = 0;
  // Gets the port number, if applicable (i.e. if listening on IP).  This is useful if you didn't
  // specify a port when constructing the NetworkAddress -- one will have been assigned
  // automatically.

  virtual void getsockopt(int level, int option, void* value, uint* length);
  virtual void setsockopt(int level, int option, const void* value, uint length);
  // Same as the methods of AsyncIoStream.
};

// =======================================================================================
// Datagram I/O

class AncillaryMessage {
  // Represents an ancillary message (aka control message) received using the recvmsg() system
  // call (or equivalent). Most apps will not use this.

public:
  inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
  AncillaryMessage() = default;

  inline int getLevel() const;
  // Originating protocol / socket level.

  inline int getType() const;
  // Protocol-specific message type.

  template <typename T>
  inline Maybe<const T&> as();
  // Interpret the ancillary message as the given struct type. Most ancillary messages are some
  // sort of struct, so this is a convenient way to access it. Returns nullptr if the message
  // is smaller than the struct -- this can happen if the message was truncated due to
  // insufficient ancillary buffer space.

  template <typename T>
  inline ArrayPtr<const T> asArray();
  // Interpret the ancillary message as an array of items. If the message size does not evenly
  // divide into elements of type T, the remainder is discarded -- this can happen if the message
  // was truncated due to insufficient ancillary buffer space.

private:
  int level;
  int type;
  ArrayPtr<const byte> data;
  // Message data. In most cases you should use `as()` or `asArray()`.
};

class DatagramReceiver {
  // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
  // capacity in advance; if a received packet is larger than the capacity, it will be truncated.

public:
  virtual Promise<void> receive() = 0;
  // Receive a new message, overwriting this object's content.
  //
  // receive() may reuse the same buffers for content and ancillary data with each call.

  template <typename T>
  struct MaybeTruncated {
    T value;

    bool isTruncated;
    // True if the Receiver's capacity was insufficient to receive the value and therefore the
    // value is truncated.
  };

  virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
  // Get the content of the datagram.

  virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
  // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr
  // struct. Most apps don't need this.
  //
  // If the returned value is truncated, then the last message in the array may itself be
  // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
  // return fewer elements than expected. Truncation can also mean that additional messages were
  // available but discarded.

  virtual NetworkAddress& getSource() = 0;
  // Get the datagram sender's address.

  struct Capacity {
    size_t content = 8192;
    // How much space to allocate for the datagram content. If a datagram is received that is
    // larger than this, it will be truncated, with no way to recover the tail.

    size_t ancillary = 0;
    // How much space to allocate for ancillary messages. As with content, if the ancillary data
    // is larger than this, it will be truncated.
  };
};

class DatagramPort {
public:
  virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
  virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
                               NetworkAddress& destination) = 0;

  virtual Own<DatagramReceiver> makeReceiver(
      DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
  // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
  // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.

  virtual uint getPort() = 0;
  // Gets the port number, if applicable (i.e. if listening on IP).  This is useful if you didn't
  // specify a port when constructing the NetworkAddress -- one will have been assigned
  // automatically.

  virtual void getsockopt(int level, int option, void* value, uint* length);
  virtual void setsockopt(int level, int option, const void* value, uint length);
  // Same as the methods of AsyncIoStream.
};

// =======================================================================================
// Networks

class NetworkAddress {
  // Represents a remote address to which the application can connect.

public:
  virtual Promise<Own<AsyncIoStream>> connect() = 0;
  // Make a new connection to this address.
  //
  // The address must not be a wildcard ("*").  If it is an IP address, it must have a port number.

  virtual Own<ConnectionReceiver> listen() = 0;
  // Listen for incoming connections on this address.
  //
  // The address must be local.

  virtual Own<DatagramPort> bindDatagramPort();
  // Open this address as a datagram (e.g. UDP) port.
  //
  // The address must be local.

  virtual Own<NetworkAddress> clone() = 0;
  // Returns an equivalent copy of this NetworkAddress.

  virtual String toString() = 0;
  // Produce a human-readable string which hopefully can be passed to Network::parseAddress()
  // to reproduce this address, although whether or not that works of course depends on the Network
  // implementation.  This should be called only to display the address to human users, who will
  // hopefully know what they are able to do with it.
};

class Network {
  // Factory for NetworkAddress instances, representing the network services offered by the
  // operating system.
  //
  // This interface typically represents broad authority, and well-designed code should limit its
  // use to high-level startup code and user interaction.  Low-level APIs should accept
  // NetworkAddress instances directly and work from there, if at all possible.

public:
  virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0;
  // Construct a network address from a user-provided string.  The format of the address
  // strings is not specified at the API level, and application code should make no assumptions
  // about them.  These strings should always be provided by humans, and said humans will know
  // what format to use in their particular context.
  //
  // `portHint`, if provided, specifies the "standard" IP port number for the application-level
  // service in play.  If the address turns out to be an IP address (v4 or v6), and it lacks a
  // port number, this port will be used.  If `addr` lacks a port number *and* `portHint` is
  // omitted, then the returned address will only support listen() and bindDatagramPort()
  // (not connect()), and an unused port will be chosen each time one of those methods is called.

  virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
  // Construct a network address from a legacy struct sockaddr.

  virtual Own<Network> restrictPeers(
      kj::ArrayPtr<const kj::StringPtr> allow,
      kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0;
  // Constructs a new Network instance wrapping this one which restricts which peer addresses are
  // permitted (both for outgoing and incoming connections).
  //
  // Communication will be allowed only with peers whose addresses match one of the patterns
  // specified in the `allow` array. If a `deny` array is specified, then any address which matches
  // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be
  // denied.
  //
  // The syntax of address patterns depends on the network, except that three special patterns are
  // defined for all networks:
  // - "private": Matches network addresses that are reserved by standards for private networks,
  //   such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local".
  // - "public": Opposite of "private".
  // - "local": Matches network addresses that are defined by standards to only be accessible from
  //   the local machine, such as "127.0.0.0/8" or Unix domain addresses.
  // - "network": Opposite of "local".
  //
  // For the standard KJ network implementation, the following patterns are also recognized:
  // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or
  //   "2001:db8::/32".
  // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a
  //   glob.)
  // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may
  //   support specifying a glob.)
  //
  // Network restrictions apply *after* DNS resolution (otherwise they'd be useless).
  //
  // It is legal to parseAddress() a restricted address. An exception won't be thrown until
  // connect() is called.
  //
  // It's possible to listen() on a restricted address. However, connections will only be accepted
  // from non-restricted addresses; others will be dropped. If a particular listen address has no
  // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then
  // listen() may throw (or may simply never receive any connections).
  //
  // Examples:
  //
  //     auto restricted = network->restrictPeers({"public"});
  //
  // Allows connections only to/from public internet addresses. Use this when connecting to an
  // address specified by a third party that is not trusted and is not themselves already on your
  // private network.
  //
  //     auto restricted = network->restrictPeers({"private"});
  //
  // Allows connections only to/from the private network. Use this on the server side to reject
  // connections from the public internet.
  //
  //     auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"});
  //
  // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked.
  //
  //     auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"});
  //
  // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an
  // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow
  // rule that is more specific than the deny rule).
};

// =======================================================================================
// I/O Provider

class AsyncIoProvider {
  // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
  //
  // Generally, the implementation of this interface must integrate closely with a particular
  // `EventLoop` implementation.  Typically, the EventLoop implementation itself will provide
  // an AsyncIoProvider.

public:
  virtual OneWayPipe newOneWayPipe() = 0;
  // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with
  // the pipe(2) system call).

  virtual TwoWayPipe newTwoWayPipe() = 0;
  // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with
  // socketpair(2) system call).  Data written to one end can be read from the other.

  virtual CapabilityPipe newCapabilityPipe();
  // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe.
  //
  // The default implementation throws an unimplemented exception. In particular this is not
  // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to
  // pass handles over a stream.

  virtual Network& getNetwork() = 0;
  // Creates a new `Network` instance representing the networks exposed by the operating system.
  //
  // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function.  If
  // you call this from low-level code, then you are preventing higher-level code from injecting an
  // alternative implementation.  Instead, if your code needs to use network functionality, it
  // should ask for a `Network` as a constructor or method parameter, so that higher-level code can
  // chose what implementation to use.  The system network is essentially a singleton.  See:
  //     http://www.object-oriented-security.org/lets-argue/singletons
  //
  // Code that uses the system network should not make any assumptions about what kinds of
  // addresses it will parse, as this could differ across platforms.  String addresses should come
  // strictly from the user, who will know how to write them correctly for their system.
  //
  // With that said, KJ currently supports the following string address formats:
  // - IPv4: "1.2.3.4", "1.2.3.4:80"
  // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
  // - Local IP wildcard (covers both v4 and v6):  "*", "*:80"
  // - Symbolic names:  "example.com", "example.com:80", "example.com:http", "1.2.3.4:http"
  // - Unix domain: "unix:/path/to/socket"

  struct PipeThread {
    // A combination of a thread and a two-way pipe that communicates with that thread.
    //
    // The fields are intentionally ordered so that the pipe will be destroyed (and therefore
    // disconnected) before the thread is destroyed (and therefore joined).  Thus if the thread
    // arranges to exit when it detects disconnect, destruction should be clean.

    Own<Thread> thread;
    Own<AsyncIoStream> pipe;
  };

  virtual PipeThread newPipeThread(
      Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0;
  // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate
  // with it.  One end of the pipe is passed to the thread's start function and the other end of
  // the pipe is returned.  The new thread also gets its own `AsyncIoProvider` instance and will
  // already have an active `EventLoop` when `startFunc` is called.
  //
  // TODO(someday):  I'm not entirely comfortable with this interface.  It seems to be doing too
  //   much at once but I'm not sure how to cleanly break it down.

  virtual Timer& getTimer() = 0;
  // Returns a `Timer` based on real time.  Time does not pass while event handlers are running --
  // it only updates when the event loop polls for system events.  This means that calling `now()`
  // on this timer does not require a system call.
  //
  // This timer is not affected by changes to the system date.  It is unspecified whether the timer
  // continues to count while the system is suspended.
};

class LowLevelAsyncIoProvider {
  // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on
  // different operating systems.  You should prefer to use `AsyncIoProvider` over this interface
  // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection.
  //
  // On Unix, this interface can be used to import native file descriptors into the async framework.
  // Different implementations of this interface might work on top of different event handling
  // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library.
  //
  // On Windows, this interface can be used to import native SOCKETs into the async framework.
  // Different implementations of this interface might work on top of different event handling
  // primitives, such as I/O completion ports vs. completion routines.

public:
  enum Flags {
    // Flags controlling how to wrap a file descriptor.

    TAKE_OWNERSHIP = 1 << 0,
    // The returned object should own the file descriptor, automatically closing it when destroyed.
    // The close-on-exec flag will be set on the descriptor if it is not already.
    //
    // If this flag is not used, then the file descriptor is not automatically closed and the
    // close-on-exec flag is not modified.

#if !_WIN32
    ALREADY_CLOEXEC = 1 << 1,
    // Indicates that the close-on-exec flag is known already to be set, so need not be set again.
    // Only relevant when combined with TAKE_OWNERSHIP.
    //
    // On Linux, all system calls which yield new file descriptors have flags or variants which
    // set the close-on-exec flag immediately.  Unfortunately, other OS's do not.

    ALREADY_NONBLOCK = 1 << 2
    // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag
    // need not be set again.  Otherwise, all wrap*Fd() methods will enable non-blocking mode
    // automatically.
    //
    // On Linux, all system calls which yield new file descriptors have flags or variants which
    // enable non-blocking mode immediately.  Unfortunately, other OS's do not.
#endif
  };

#if _WIN32
  typedef uintptr_t Fd;
  typedef AutoCloseHandle OwnFd;
  // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the
  // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify
  // explicitly).
#else
  typedef int Fd;
  typedef AutoCloseFd OwnFd;
  // On Unix, any arbitrary file descriptor is supported.
#endif

  virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0;
  // Create an AsyncInputStream wrapping a file descriptor.
  //
  // `flags` is a bitwise-OR of the values of the `Flags` enum.

  virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0;
  // Create an AsyncOutputStream wrapping a file descriptor.
  //
  // `flags` is a bitwise-OR of the values of the `Flags` enum.

  virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0;
  // Create an AsyncIoStream wrapping a socket file descriptor.
  //
  // `flags` is a bitwise-OR of the values of the `Flags` enum.

#if !_WIN32
  virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0);
  // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be
  // a Unix domain socket.
  //
  // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with
  // LowLevelAsyncIoProvider implementations written before this method was added.
#endif

  virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
      Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
  // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
  // The returned promise does not resolve until connection has completed.
  //
  // `flags` is a bitwise-OR of the values of the `Flags` enum.

  class NetworkFilter {
  public:
    virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0;
    // Returns true if incoming connections or datagrams from the given peer should be accepted.
    // If false, they will be dropped. This is used to implement kj::Network::restrictPeers().

    static NetworkFilter& getAllAllowed();
  };

  virtual Own<ConnectionReceiver> wrapListenSocketFd(
      Fd fd, NetworkFilter& filter, uint flags = 0) = 0;
  inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) {
    return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
  }
  // Create an AsyncIoStream wrapping a listen socket file descriptor.  This socket should already
  // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
  //
  // `flags` is a bitwise-OR of the values of the `Flags` enum.

  virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0);
  inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) {
    return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags);
  }

  virtual Timer& getTimer() = 0;
  // Returns a `Timer` based on real time.  Time does not pass while event handlers are running --
  // it only updates when the event loop polls for system events.  This means that calling `now()`
  // on this timer does not require a system call.
  //
  // This timer is not affected by changes to the system date.  It is unspecified whether the timer
  // continues to count while the system is suspended.

  Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0);
  Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0);
  Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0);
#if !_WIN32
  Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0);
#endif
  Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
      OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0);
  Own<ConnectionReceiver> wrapListenSocketFd(
      OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
  Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0);
  Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0);
  Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0);
  // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle
  // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`.
};

Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
// Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`.

struct AsyncIoContext {
  Own<LowLevelAsyncIoProvider> lowLevelProvider;
  Own<AsyncIoProvider> provider;
  WaitScope& waitScope;

#if _WIN32
  Win32EventPort& win32EventPort;
#else
  UnixEventPort& unixEventPort;
  // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This
  //   field will go away at some point when we have a chance to improve these interfaces.
#endif
};

AsyncIoContext setupAsyncIo();
// Convenience method which sets up the current thread with everything it needs to do async I/O.
// The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for
// doing I/O on the host system, so everything is ready for the thread to start making async calls
// and waiting on promises.
//
// You would typically call this in your main() loop or in the start function of a thread.
// Example:
//
//     int main() {
//       auto ioContext = kj::setupAsyncIo();
//
//       // Now we can call an async function.
//       Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com");
//
//       // And we can wait for the promise to complete.  Note that you can only use `wait()`
//       // from the top level, not from inside a promise callback.
//       String text = textPromise.wait(ioContext.waitScope);
//       print(text);
//       return 0;
//     }
//
// WARNING: An AsyncIoContext can only be used in the thread and process that created it. In
//   particular, note that after a fork(), an AsyncIoContext created in the parent process will
//   not work correctly in the child, even if the parent ceases to use its copy. In particular
//   note that this means that server processes which daemonize themselves at startup must wait
//   until after daemonization to create an AsyncIoContext.

// =======================================================================================
// Convenience adapters.

class CapabilityStreamConnectionReceiver final: public ConnectionReceiver {
  // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept()
  // calls receiveStream().

public:
  CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner)
      : inner(inner) {}

  Promise<Own<AsyncIoStream>> accept() override;
  uint getPort() override;

private:
  AsyncCapabilityStream& inner;
};

class CapabilityStreamNetworkAddress final: public NetworkAddress {
  // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress.
  //
  // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the
  // original capability stream, and returning the other end.
  //
  // listen().accept() is implemented by receiving new streams over the original stream.
  //
  // Note that clone() dosen't work (due to ownership issues) and toString() returns a static
  // string.

public:
  CapabilityStreamNetworkAddress(AsyncIoProvider& provider, AsyncCapabilityStream& inner)
      : provider(provider), inner(inner) {}

  Promise<Own<AsyncIoStream>> connect() override;
  Own<ConnectionReceiver> listen() override;

  Own<NetworkAddress> clone() override;
  String toString() override;

private:
  AsyncIoProvider& provider;
  AsyncCapabilityStream& inner;
};

// =======================================================================================
// inline implementation details

inline AncillaryMessage::AncillaryMessage(
    int level, int type, ArrayPtr<const byte> data)
    : level(level), type(type), data(data) {}

inline int AncillaryMessage::getLevel() const { return level; }
inline int AncillaryMessage::getType() const { return type; }

template <typename T>
inline Maybe<const T&> AncillaryMessage::as() {
  if (data.size() >= sizeof(T)) {
    return *reinterpret_cast<const T*>(data.begin());
  } else {
    return nullptr;
  }
}

template <typename T>
inline ArrayPtr<const T> AncillaryMessage::asArray() {
  return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
}

}  // namespace kj

#endif  // KJ_ASYNC_IO_H_