// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #ifndef KJ_ASYNC_IO_H_ #define KJ_ASYNC_IO_H_ #if defined(__GNUC__) && !KJ_HEADER_WARNINGS #pragma GCC system_header #endif #include "async.h" #include "function.h" #include "thread.h" #include "timer.h" struct sockaddr; namespace kj { #if _WIN32 class Win32EventPort; class AutoCloseHandle; #else class UnixEventPort; #endif class AutoCloseFd; class NetworkAddress; class AsyncOutputStream; class AsyncIoStream; // ======================================================================================= // Streaming I/O class AsyncInputStream { // Asynchronous equivalent of InputStream (from io.h). public: virtual Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes); virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; Promise<void> read(void* buffer, size_t bytes); virtual Maybe<uint64_t> tryGetLength(); // Get the remaining number of bytes that will be produced by this stream, if known. // // This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the // HTTP implementation may need to fall back to Transfer-Encoding: chunked. // // The default implementation always returns null. virtual Promise<uint64_t> pumpTo( AsyncOutputStream& output, uint64_t amount = kj::maxValue); // Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the // total bytes actually pumped (which is only less than `amount` if EOF was reached). // // Override this if your stream type knows how to pump itself to certain kinds of output // streams more efficiently than via the naive approach. You can use // kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match, // delegate to the default implementation. // // The default implementation first tries calling output.tryPumpFrom(), but if that fails, it // performs a naive pump by allocating a buffer and reading to it / writing from it in a loop. Promise<Array<byte>> readAllBytes(); Promise<String> readAllText(); // Read until EOF and return as one big byte array or string. }; class AsyncOutputStream { // Asynchronous equivalent of OutputStream (from io.h). public: virtual Promise<void> write(const void* buffer, size_t size) KJ_WARN_UNUSED_RESULT = 0; virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) KJ_WARN_UNUSED_RESULT = 0; virtual Maybe<Promise<uint64_t>> tryPumpFrom( AsyncInputStream& input, uint64_t amount = kj::maxValue); // Implements double-dispatch for AsyncInputStream::pumpTo(). // // This method should only be called from within an implementation of pumpTo(). // // This method examines the type of `input` to find optimized ways to pump data from it to this // output stream. If it finds one, it performs the pump. Otherwise, it returns null. // // The default implementation always returns null. }; class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { // A combination input and output stream. public: virtual void shutdownWrite() = 0; // Cleanly shut down just the write end of the stream, while keeping the read end open. virtual void abortRead() {} // Similar to shutdownWrite, but this will shut down the read end of the stream, and should only // be called when an error has occurred. virtual void getsockopt(int level, int option, void* value, uint* length); virtual void setsockopt(int level, int option, const void* value, uint length); // Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception // if the stream is not a socket or the option is not appropriate for the socket type. The // default implementations always throw "unimplemented". virtual void getsockname(struct sockaddr* addr, uint* length); virtual void getpeername(struct sockaddr* addr, uint* length); // Corresponds to getsockname() and getpeername() syscalls. Will throw an "unimplemented" // exception if the stream is not a socket. The default implementations always throw // "unimplemented". // // Note that we don't provide methods that return NetworkAddress because it usually wouldn't // be useful. You can't connect() to or listen() on these addresses, obviously, because they are // ephemeral addresses for a single connection. }; class AsyncCapabilityStream: public AsyncIoStream { // An AsyncIoStream that also allows sending and receiving new connections or other kinds of // capabilities, in addition to simple data. // // For correct functioning, a protocol must be designed such that the receiver knows when to // expect a capability transfer. The receiver must not read() when a capability is expected, and // must not receiveStream() when data is expected -- if it does, an exception may be thrown or // invalid data may be returned. This implies that data sent over an AsyncCapabilityStream must // be framed such that the receiver knows exactly how many bytes to read before receiving a // capability. // // On Unix, KJ provides an implementation based on Unix domain sockets and file descriptor // passing via SCM_RIGHTS. Due to the nature of SCM_RIGHTS, if the application accidentally // read()s when it should have called receiveStream(), it will observe a NUL byte in the data // and the capability will be discarded. Of course, an application should not depend on this // behavior; it should avoid read()ing through a capability. // // KJ does not provide any implementation of this type on Windows, as there's no obvious // implementation there. Handle passing on Windows requires at least one of the processes // involved to have permission to modify the other's handle table, which is effectively full // control. Handle passing between mutually non-trusting processes would require a trusted // broker process to facilitate. One could possibly implement this type in terms of such a // broker, or in terms of direct handle passing if at least one process trusts the other. public: Promise<Own<AsyncCapabilityStream>> receiveStream(); virtual Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream() = 0; virtual Promise<void> sendStream(Own<AsyncCapabilityStream> stream) = 0; // Transfer a stream. Promise<AutoCloseFd> receiveFd(); virtual Promise<Maybe<AutoCloseFd>> tryReceiveFd(); virtual Promise<void> sendFd(int fd); // Transfer a raw file descriptor. Default implementation throws UNIMPLEMENTED. }; struct OneWayPipe { // A data pipe with an input end and an output end. (Typically backed by pipe() system call.) Own<AsyncInputStream> in; Own<AsyncOutputStream> out; }; struct TwoWayPipe { // A data pipe that supports sending in both directions. Each end's output sends data to the // other end's input. (Typically backed by socketpair() system call.) Own<AsyncIoStream> ends[2]; }; struct CapabilityPipe { // Like TwoWayPipe but allowing capability-passing. Own<AsyncCapabilityStream> ends[2]; }; class ConnectionReceiver { // Represents a server socket listening on a port. public: virtual Promise<Own<AsyncIoStream>> accept() = 0; // Accept the next incoming connection. virtual uint getPort() = 0; // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't // specify a port when constructing the NetworkAddress -- one will have been assigned // automatically. virtual void getsockopt(int level, int option, void* value, uint* length); virtual void setsockopt(int level, int option, const void* value, uint length); // Same as the methods of AsyncIoStream. }; // ======================================================================================= // Datagram I/O class AncillaryMessage { // Represents an ancillary message (aka control message) received using the recvmsg() system // call (or equivalent). Most apps will not use this. public: inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data); AncillaryMessage() = default; inline int getLevel() const; // Originating protocol / socket level. inline int getType() const; // Protocol-specific message type. template <typename T> inline Maybe<const T&> as(); // Interpret the ancillary message as the given struct type. Most ancillary messages are some // sort of struct, so this is a convenient way to access it. Returns nullptr if the message // is smaller than the struct -- this can happen if the message was truncated due to // insufficient ancillary buffer space. template <typename T> inline ArrayPtr<const T> asArray(); // Interpret the ancillary message as an array of items. If the message size does not evenly // divide into elements of type T, the remainder is discarded -- this can happen if the message // was truncated due to insufficient ancillary buffer space. private: int level; int type; ArrayPtr<const byte> data; // Message data. In most cases you should use `as()` or `asArray()`. }; class DatagramReceiver { // Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's // capacity in advance; if a received packet is larger than the capacity, it will be truncated. public: virtual Promise<void> receive() = 0; // Receive a new message, overwriting this object's content. // // receive() may reuse the same buffers for content and ancillary data with each call. template <typename T> struct MaybeTruncated { T value; bool isTruncated; // True if the Receiver's capacity was insufficient to receive the value and therefore the // value is truncated. }; virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0; // Get the content of the datagram. virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0; // Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr // struct. Most apps don't need this. // // If the returned value is truncated, then the last message in the array may itself be // truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will // return fewer elements than expected. Truncation can also mean that additional messages were // available but discarded. virtual NetworkAddress& getSource() = 0; // Get the datagram sender's address. struct Capacity { size_t content = 8192; // How much space to allocate for the datagram content. If a datagram is received that is // larger than this, it will be truncated, with no way to recover the tail. size_t ancillary = 0; // How much space to allocate for ancillary messages. As with content, if the ancillary data // is larger than this, it will be truncated. }; }; class DatagramPort { public: virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0; virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces, NetworkAddress& destination) = 0; virtual Own<DatagramReceiver> makeReceiver( DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0; // Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much // space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`. virtual uint getPort() = 0; // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't // specify a port when constructing the NetworkAddress -- one will have been assigned // automatically. virtual void getsockopt(int level, int option, void* value, uint* length); virtual void setsockopt(int level, int option, const void* value, uint length); // Same as the methods of AsyncIoStream. }; // ======================================================================================= // Networks class NetworkAddress { // Represents a remote address to which the application can connect. public: virtual Promise<Own<AsyncIoStream>> connect() = 0; // Make a new connection to this address. // // The address must not be a wildcard ("*"). If it is an IP address, it must have a port number. virtual Own<ConnectionReceiver> listen() = 0; // Listen for incoming connections on this address. // // The address must be local. virtual Own<DatagramPort> bindDatagramPort(); // Open this address as a datagram (e.g. UDP) port. // // The address must be local. virtual Own<NetworkAddress> clone() = 0; // Returns an equivalent copy of this NetworkAddress. virtual String toString() = 0; // Produce a human-readable string which hopefully can be passed to Network::parseAddress() // to reproduce this address, although whether or not that works of course depends on the Network // implementation. This should be called only to display the address to human users, who will // hopefully know what they are able to do with it. }; class Network { // Factory for NetworkAddress instances, representing the network services offered by the // operating system. // // This interface typically represents broad authority, and well-designed code should limit its // use to high-level startup code and user interaction. Low-level APIs should accept // NetworkAddress instances directly and work from there, if at all possible. public: virtual Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) = 0; // Construct a network address from a user-provided string. The format of the address // strings is not specified at the API level, and application code should make no assumptions // about them. These strings should always be provided by humans, and said humans will know // what format to use in their particular context. // // `portHint`, if provided, specifies the "standard" IP port number for the application-level // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is // omitted, then the returned address will only support listen() and bindDatagramPort() // (not connect()), and an unused port will be chosen each time one of those methods is called. virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; // Construct a network address from a legacy struct sockaddr. virtual Own<Network> restrictPeers( kj::ArrayPtr<const kj::StringPtr> allow, kj::ArrayPtr<const kj::StringPtr> deny = nullptr) KJ_WARN_UNUSED_RESULT = 0; // Constructs a new Network instance wrapping this one which restricts which peer addresses are // permitted (both for outgoing and incoming connections). // // Communication will be allowed only with peers whose addresses match one of the patterns // specified in the `allow` array. If a `deny` array is specified, then any address which matches // a pattern in `deny` and *does not* match any more-specific pattern in `allow` will also be // denied. // // The syntax of address patterns depends on the network, except that three special patterns are // defined for all networks: // - "private": Matches network addresses that are reserved by standards for private networks, // such as "10.0.0.0/8" or "192.168.0.0/16". This is a superset of "local". // - "public": Opposite of "private". // - "local": Matches network addresses that are defined by standards to only be accessible from // the local machine, such as "127.0.0.0/8" or Unix domain addresses. // - "network": Opposite of "local". // // For the standard KJ network implementation, the following patterns are also recognized: // - Network blocks specified in CIDR notation (ipv4 and ipv6), such as "192.0.2.0/24" or // "2001:db8::/32". // - "unix" to match all Unix domain addresses. (In the future, we may support specifying a // glob.) // - "unix-abstract" to match Linux's "abstract unix domain" addresses. (In the future, we may // support specifying a glob.) // // Network restrictions apply *after* DNS resolution (otherwise they'd be useless). // // It is legal to parseAddress() a restricted address. An exception won't be thrown until // connect() is called. // // It's possible to listen() on a restricted address. However, connections will only be accepted // from non-restricted addresses; others will be dropped. If a particular listen address has no // valid peers (e.g. because it's a unix socket address and unix sockets are not allowed) then // listen() may throw (or may simply never receive any connections). // // Examples: // // auto restricted = network->restrictPeers({"public"}); // // Allows connections only to/from public internet addresses. Use this when connecting to an // address specified by a third party that is not trusted and is not themselves already on your // private network. // // auto restricted = network->restrictPeers({"private"}); // // Allows connections only to/from the private network. Use this on the server side to reject // connections from the public internet. // // auto restricted = network->restrictPeers({"192.0.2.0/24"}, {"192.0.2.3/32"}); // // Allows connections only to/from 192.0.2.*, except 192.0.2.3 which is blocked. // // auto restricted = network->restrictPeers({"10.0.0.0/8", "10.1.2.3/32"}, {"10.1.2.0/24"}); // // Allows connections to/from 10.*.*.*, with the exception of 10.1.2.* (which is denied), with an // exception to the exception of 10.1.2.3 (which is allowed, because it is matched by an allow // rule that is more specific than the deny rule). }; // ======================================================================================= // I/O Provider class AsyncIoProvider { // Class which constructs asynchronous wrappers around the operating system's I/O facilities. // // Generally, the implementation of this interface must integrate closely with a particular // `EventLoop` implementation. Typically, the EventLoop implementation itself will provide // an AsyncIoProvider. public: virtual OneWayPipe newOneWayPipe() = 0; // Creates an input/output stream pair representing the ends of a one-way pipe (e.g. created with // the pipe(2) system call). virtual TwoWayPipe newTwoWayPipe() = 0; // Creates two AsyncIoStreams representing the two ends of a two-way pipe (e.g. created with // socketpair(2) system call). Data written to one end can be read from the other. virtual CapabilityPipe newCapabilityPipe(); // Creates two AsyncCapabilityStreams representing the two ends of a two-way capability pipe. // // The default implementation throws an unimplemented exception. In particular this is not // implemented by the default AsyncIoProvider on Windows, since Windows lacks any sane way to // pass handles over a stream. virtual Network& getNetwork() = 0; // Creates a new `Network` instance representing the networks exposed by the operating system. // // DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If // you call this from low-level code, then you are preventing higher-level code from injecting an // alternative implementation. Instead, if your code needs to use network functionality, it // should ask for a `Network` as a constructor or method parameter, so that higher-level code can // chose what implementation to use. The system network is essentially a singleton. See: // http://www.object-oriented-security.org/lets-argue/singletons // // Code that uses the system network should not make any assumptions about what kinds of // addresses it will parse, as this could differ across platforms. String addresses should come // strictly from the user, who will know how to write them correctly for their system. // // With that said, KJ currently supports the following string address formats: // - IPv4: "1.2.3.4", "1.2.3.4:80" // - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80" // - Local IP wildcard (covers both v4 and v6): "*", "*:80" // - Symbolic names: "example.com", "example.com:80", "example.com:http", "1.2.3.4:http" // - Unix domain: "unix:/path/to/socket" struct PipeThread { // A combination of a thread and a two-way pipe that communicates with that thread. // // The fields are intentionally ordered so that the pipe will be destroyed (and therefore // disconnected) before the thread is destroyed (and therefore joined). Thus if the thread // arranges to exit when it detects disconnect, destruction should be clean. Own<Thread> thread; Own<AsyncIoStream> pipe; }; virtual PipeThread newPipeThread( Function<void(AsyncIoProvider&, AsyncIoStream&, WaitScope&)> startFunc) = 0; // Create a new thread and set up a two-way pipe (socketpair) which can be used to communicate // with it. One end of the pipe is passed to the thread's start function and the other end of // the pipe is returned. The new thread also gets its own `AsyncIoProvider` instance and will // already have an active `EventLoop` when `startFunc` is called. // // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too // much at once but I'm not sure how to cleanly break it down. virtual Timer& getTimer() = 0; // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- // it only updates when the event loop polls for system events. This means that calling `now()` // on this timer does not require a system call. // // This timer is not affected by changes to the system date. It is unspecified whether the timer // continues to count while the system is suspended. }; class LowLevelAsyncIoProvider { // Similar to `AsyncIoProvider`, but represents a lower-level interface that may differ on // different operating systems. You should prefer to use `AsyncIoProvider` over this interface // whenever possible, as `AsyncIoProvider` is portable and friendlier to dependency-injection. // // On Unix, this interface can be used to import native file descriptors into the async framework. // Different implementations of this interface might work on top of different event handling // primitives, such as poll vs. epoll vs. kqueue vs. some higher-level event library. // // On Windows, this interface can be used to import native SOCKETs into the async framework. // Different implementations of this interface might work on top of different event handling // primitives, such as I/O completion ports vs. completion routines. public: enum Flags { // Flags controlling how to wrap a file descriptor. TAKE_OWNERSHIP = 1 << 0, // The returned object should own the file descriptor, automatically closing it when destroyed. // The close-on-exec flag will be set on the descriptor if it is not already. // // If this flag is not used, then the file descriptor is not automatically closed and the // close-on-exec flag is not modified. #if !_WIN32 ALREADY_CLOEXEC = 1 << 1, // Indicates that the close-on-exec flag is known already to be set, so need not be set again. // Only relevant when combined with TAKE_OWNERSHIP. // // On Linux, all system calls which yield new file descriptors have flags or variants which // set the close-on-exec flag immediately. Unfortunately, other OS's do not. ALREADY_NONBLOCK = 1 << 2 // Indicates that the file descriptor is known already to be in non-blocking mode, so the flag // need not be set again. Otherwise, all wrap*Fd() methods will enable non-blocking mode // automatically. // // On Linux, all system calls which yield new file descriptors have flags or variants which // enable non-blocking mode immediately. Unfortunately, other OS's do not. #endif }; #if _WIN32 typedef uintptr_t Fd; typedef AutoCloseHandle OwnFd; // On Windows, the `fd` parameter to each of these methods must be a SOCKET, and must have the // flag WSA_FLAG_OVERLAPPED (which socket() uses by default, but WSASocket() wants you to specify // explicitly). #else typedef int Fd; typedef AutoCloseFd OwnFd; // On Unix, any arbitrary file descriptor is supported. #endif virtual Own<AsyncInputStream> wrapInputFd(Fd fd, uint flags = 0) = 0; // Create an AsyncInputStream wrapping a file descriptor. // // `flags` is a bitwise-OR of the values of the `Flags` enum. virtual Own<AsyncOutputStream> wrapOutputFd(Fd fd, uint flags = 0) = 0; // Create an AsyncOutputStream wrapping a file descriptor. // // `flags` is a bitwise-OR of the values of the `Flags` enum. virtual Own<AsyncIoStream> wrapSocketFd(Fd fd, uint flags = 0) = 0; // Create an AsyncIoStream wrapping a socket file descriptor. // // `flags` is a bitwise-OR of the values of the `Flags` enum. #if !_WIN32 virtual Own<AsyncCapabilityStream> wrapUnixSocketFd(Fd fd, uint flags = 0); // Like wrapSocketFd() but also support capability passing via SCM_RIGHTS. The socket must be // a Unix domain socket. // // The default implementation throws UNIMPLEMENTED, for backwards-compatibility with // LowLevelAsyncIoProvider implementations written before this method was added. #endif virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address. // The returned promise does not resolve until connection has completed. // // `flags` is a bitwise-OR of the values of the `Flags` enum. class NetworkFilter { public: virtual bool shouldAllow(const struct sockaddr* addr, uint addrlen) = 0; // Returns true if incoming connections or datagrams from the given peer should be accepted. // If false, they will be dropped. This is used to implement kj::Network::restrictPeers(). static NetworkFilter& getAllAllowed(); }; virtual Own<ConnectionReceiver> wrapListenSocketFd( Fd fd, NetworkFilter& filter, uint flags = 0) = 0; inline Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) { return wrapListenSocketFd(fd, NetworkFilter::getAllAllowed(), flags); } // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already // have had `bind()` and `listen()` called on it, so it's ready for `accept()`. // // `flags` is a bitwise-OR of the values of the `Flags` enum. virtual Own<DatagramPort> wrapDatagramSocketFd(Fd fd, NetworkFilter& filter, uint flags = 0); inline Own<DatagramPort> wrapDatagramSocketFd(Fd fd, uint flags = 0) { return wrapDatagramSocketFd(fd, NetworkFilter::getAllAllowed(), flags); } virtual Timer& getTimer() = 0; // Returns a `Timer` based on real time. Time does not pass while event handlers are running -- // it only updates when the event loop polls for system events. This means that calling `now()` // on this timer does not require a system call. // // This timer is not affected by changes to the system date. It is unspecified whether the timer // continues to count while the system is suspended. Own<AsyncInputStream> wrapInputFd(OwnFd&& fd, uint flags = 0); Own<AsyncOutputStream> wrapOutputFd(OwnFd&& fd, uint flags = 0); Own<AsyncIoStream> wrapSocketFd(OwnFd&& fd, uint flags = 0); #if !_WIN32 Own<AsyncCapabilityStream> wrapUnixSocketFd(OwnFd&& fd, uint flags = 0); #endif Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( OwnFd&& fd, const struct sockaddr* addr, uint addrlen, uint flags = 0); Own<ConnectionReceiver> wrapListenSocketFd( OwnFd&& fd, NetworkFilter& filter, uint flags = 0); Own<ConnectionReceiver> wrapListenSocketFd(OwnFd&& fd, uint flags = 0); Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, NetworkFilter& filter, uint flags = 0); Own<DatagramPort> wrapDatagramSocketFd(OwnFd&& fd, uint flags = 0); // Convenience wrappers which transfer ownership via AutoCloseFd (Unix) or AutoCloseHandle // (Windows). TAKE_OWNERSHIP will be implicitly added to `flags`. }; Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); // Make a new AsyncIoProvider wrapping a `LowLevelAsyncIoProvider`. struct AsyncIoContext { Own<LowLevelAsyncIoProvider> lowLevelProvider; Own<AsyncIoProvider> provider; WaitScope& waitScope; #if _WIN32 Win32EventPort& win32EventPort; #else UnixEventPort& unixEventPort; // TEMPORARY: Direct access to underlying UnixEventPort, mainly for waiting on signals. This // field will go away at some point when we have a chance to improve these interfaces. #endif }; AsyncIoContext setupAsyncIo(); // Convenience method which sets up the current thread with everything it needs to do async I/O. // The returned objects contain an `EventLoop` which is wrapping an appropriate `EventPort` for // doing I/O on the host system, so everything is ready for the thread to start making async calls // and waiting on promises. // // You would typically call this in your main() loop or in the start function of a thread. // Example: // // int main() { // auto ioContext = kj::setupAsyncIo(); // // // Now we can call an async function. // Promise<String> textPromise = getHttp(*ioContext.provider, "http://example.com"); // // // And we can wait for the promise to complete. Note that you can only use `wait()` // // from the top level, not from inside a promise callback. // String text = textPromise.wait(ioContext.waitScope); // print(text); // return 0; // } // // WARNING: An AsyncIoContext can only be used in the thread and process that created it. In // particular, note that after a fork(), an AsyncIoContext created in the parent process will // not work correctly in the child, even if the parent ceases to use its copy. In particular // note that this means that server processes which daemonize themselves at startup must wait // until after daemonization to create an AsyncIoContext. // ======================================================================================= // Convenience adapters. class CapabilityStreamConnectionReceiver final: public ConnectionReceiver { // Trivial wrapper which allows an AsyncCapabilityStream to act as a ConnectionReceiver. accept() // calls receiveStream(). public: CapabilityStreamConnectionReceiver(AsyncCapabilityStream& inner) : inner(inner) {} Promise<Own<AsyncIoStream>> accept() override; uint getPort() override; private: AsyncCapabilityStream& inner; }; class CapabilityStreamNetworkAddress final: public NetworkAddress { // Trivial wrapper which allows an AsyncCapabilityStream to act as a NetworkAddress. // // connect() is implemented by calling provider.newCapabilityPipe(), sending one end over the // original capability stream, and returning the other end. // // listen().accept() is implemented by receiving new streams over the original stream. // // Note that clone() dosen't work (due to ownership issues) and toString() returns a static // string. public: CapabilityStreamNetworkAddress(AsyncIoProvider& provider, AsyncCapabilityStream& inner) : provider(provider), inner(inner) {} Promise<Own<AsyncIoStream>> connect() override; Own<ConnectionReceiver> listen() override; Own<NetworkAddress> clone() override; String toString() override; private: AsyncIoProvider& provider; AsyncCapabilityStream& inner; }; // ======================================================================================= // inline implementation details inline AncillaryMessage::AncillaryMessage( int level, int type, ArrayPtr<const byte> data) : level(level), type(type), data(data) {} inline int AncillaryMessage::getLevel() const { return level; } inline int AncillaryMessage::getType() const { return type; } template <typename T> inline Maybe<const T&> AncillaryMessage::as() { if (data.size() >= sizeof(T)) { return *reinterpret_cast<const T*>(data.begin()); } else { return nullptr; } } template <typename T> inline ArrayPtr<const T> AncillaryMessage::asArray() { return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T)); } } // namespace kj #endif // KJ_ASYNC_IO_H_