Commit 68ad3220 authored by Kenton Varda's avatar Kenton Varda

Add interfaces for UDP networking, with support for receiving ancillary data.

parent 8b7fdebb
...@@ -221,5 +221,151 @@ TEST(AsyncIo, Timeouts) { ...@@ -221,5 +221,151 @@ TEST(AsyncIo, Timeouts) {
EXPECT_EQ(123, promise2.wait(ioContext.waitScope)); EXPECT_EQ(123, promise2.wait(ioContext.waitScope));
} }
TEST(AsyncIo, Udp) {
auto ioContext = setupAsyncIo();
auto addr = ioContext.provider->getNetwork().parseAddress("127.0.0.1").wait(ioContext.waitScope);
auto port1 = addr->bindDatagramPort();
auto port2 = addr->bindDatagramPort();
auto addr1 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port1->getPort())
.wait(ioContext.waitScope);
auto addr2 = ioContext.provider->getNetwork().parseAddress("127.0.0.1", port2->getPort())
.wait(ioContext.waitScope);
Own<NetworkAddress> receivedAddr;
{
// Send a message and receive it.
EXPECT_EQ(3, port1->send("foo", 3, *addr2).wait(ioContext.waitScope));
auto receiver = port2->makeReceiver();
receiver->receive().wait(ioContext.waitScope);
{
auto content = receiver->getContent();
EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
EXPECT_FALSE(content.isTruncated);
}
receivedAddr = receiver->getSource().clone();
EXPECT_EQ(addr1->toString(), receivedAddr->toString());
{
auto ancillary = receiver->getAncillary();
EXPECT_EQ(0, ancillary.value.size());
EXPECT_FALSE(ancillary.isTruncated);
}
// Receive a second message with the same receiver.
{
auto promise = receiver->receive(); // This time, start receiving before sending
EXPECT_EQ(6, port1->send("barbaz", 6, *addr2).wait(ioContext.waitScope));
promise.wait(ioContext.waitScope);
auto content = receiver->getContent();
EXPECT_EQ("barbaz", kj::heapString(content.value.asChars()));
EXPECT_FALSE(content.isTruncated);
}
}
DatagramReceiver::Capacity capacity;
capacity.content = 8;
capacity.ancillary = 1024;
{
// Send a reply that will be truncated.
EXPECT_EQ(16, port2->send("0123456789abcdef", 16, *receivedAddr).wait(ioContext.waitScope));
auto recv1 = port1->makeReceiver(capacity);
recv1->receive().wait(ioContext.waitScope);
{
auto content = recv1->getContent();
EXPECT_EQ("01234567", kj::heapString(content.value.asChars()));
EXPECT_TRUE(content.isTruncated);
}
EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
{
auto ancillary = recv1->getAncillary();
EXPECT_EQ(0, ancillary.value.size());
EXPECT_FALSE(ancillary.isTruncated);
}
#ifdef IP_PKTINFO
// Set IP_PKTINFO header and try to receive it.
int one = 1;
port1->setsockopt(IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
EXPECT_EQ(3, port2->send("foo", 3, *addr1).wait(ioContext.waitScope));
recv1->receive().wait(ioContext.waitScope);
{
auto content = recv1->getContent();
EXPECT_EQ("foo", kj::heapString(content.value.asChars()));
EXPECT_FALSE(content.isTruncated);
}
EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
{
auto ancillary = recv1->getAncillary();
EXPECT_FALSE(ancillary.isTruncated);
ASSERT_EQ(1, ancillary.value.size());
auto message = ancillary.value[0];
EXPECT_EQ(IPPROTO_IP, message.getLevel());
EXPECT_EQ(IP_PKTINFO, message.getType());
EXPECT_EQ(sizeof(struct in_pktinfo), message.asArray<byte>().size());
auto& pktinfo = KJ_ASSERT_NONNULL(message.as<struct in_pktinfo>());
EXPECT_EQ(htonl(0x7F000001), pktinfo.ipi_addr.s_addr); // 127.0.0.1
}
// See what happens if there's not quite enough space for in_pktinfo.
capacity.ancillary = CMSG_SPACE(sizeof(struct in_pktinfo)) - 8;
recv1 = port1->makeReceiver(capacity);
EXPECT_EQ(3, port2->send("bar", 3, *addr1).wait(ioContext.waitScope));
recv1->receive().wait(ioContext.waitScope);
{
auto content = recv1->getContent();
EXPECT_EQ("bar", kj::heapString(content.value.asChars()));
EXPECT_FALSE(content.isTruncated);
}
EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
{
auto ancillary = recv1->getAncillary();
EXPECT_TRUE(ancillary.isTruncated);
// We might get a message, but it will be truncated.
if (ancillary.value.size() != 0) {
EXPECT_EQ(1, ancillary.value.size());
auto message = ancillary.value[0];
EXPECT_EQ(IPPROTO_IP, message.getLevel());
EXPECT_EQ(IP_PKTINFO, message.getType());
EXPECT_TRUE(message.as<struct in_pktinfo>() == nullptr);
EXPECT_LT(message.asArray<byte>().size(), sizeof(struct in_pktinfo));
}
}
// See what happens if there's not enough space even for the cmsghdr.
capacity.ancillary = CMSG_SPACE(0) - 8;
recv1 = port1->makeReceiver(capacity);
EXPECT_EQ(3, port2->send("baz", 3, *addr1).wait(ioContext.waitScope));
recv1->receive().wait(ioContext.waitScope);
{
auto content = recv1->getContent();
EXPECT_EQ("baz", kj::heapString(content.value.asChars()));
EXPECT_FALSE(content.isTruncated);
}
EXPECT_EQ(addr2->toString(), recv1->getSource().toString());
{
auto ancillary = recv1->getAncillary();
EXPECT_TRUE(ancillary.isTruncated);
EXPECT_EQ(0, ancillary.value.size());
}
#endif
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
This diff is collapsed.
...@@ -34,6 +34,10 @@ ...@@ -34,6 +34,10 @@
namespace kj { namespace kj {
class UnixEventPort; class UnixEventPort;
class NetworkAddress;
// =======================================================================================
// Streaming I/O
class AsyncInputStream { class AsyncInputStream {
// Asynchronous equivalent of InputStream (from io.h). // Asynchronous equivalent of InputStream (from io.h).
...@@ -59,6 +63,26 @@ class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { ...@@ -59,6 +63,26 @@ class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
public: public:
virtual void shutdownWrite() = 0; virtual void shutdownWrite() = 0;
// Cleanly shut down just the write end of the stream, while keeping the read end open. // Cleanly shut down just the write end of the stream, while keeping the read end open.
virtual void getsockopt(int level, int option, void* value, uint* length);
virtual void setsockopt(int level, int option, const void* value, uint length);
// Corresponds to getsockopt() and setsockopt() syscalls. Will throw an "unimplemented" exception
// if the stream is not a socket or the option is not appropriate for the socket type. The
// default implementations always throw "unimplemented".
};
struct OneWayPipe {
// A data pipe with an input end and an output end. (Typically backed by pipe() system call.)
Own<AsyncInputStream> in;
Own<AsyncOutputStream> out;
};
struct TwoWayPipe {
// A data pipe that supports sending in both directions. Each end's output sends data to the
// other end's input. (Typically backed by socketpair() system call.)
Own<AsyncIoStream> ends[2];
}; };
class ConnectionReceiver { class ConnectionReceiver {
...@@ -70,9 +94,120 @@ public: ...@@ -70,9 +94,120 @@ public:
virtual uint getPort() = 0; virtual uint getPort() = 0;
// Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't // Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
// specify a port when constructing the LocalAddress -- one will have been assigned automatically. // specify a port when constructing the NetworkAddress -- one will have been assigned
// automatically.
virtual void getsockopt(int level, int option, void* value, uint* length);
virtual void setsockopt(int level, int option, const void* value, uint length);
// Same as the methods of AsyncIoStream.
};
// =======================================================================================
// Datagram I/O
class AncillaryMessage {
// Represents an ancillary message (aka control message) received using the recvmsg() system
// call (or equivalent). Most apps will not use this.
public:
inline AncillaryMessage(int level, int type, ArrayPtr<const byte> data);
AncillaryMessage() = default;
inline int getLevel() const;
// Originating protocol / socket level.
inline int getType() const;
// Protocol-specific message type.
template <typename T>
inline Maybe<const T&> as();
// Interpret the ancillary message as the given struct type. Most ancillary messages are some
// sort of struct, so this is a convenient way to access it. Returns nullptr if the message
// is smaller than the struct -- this can happen if the message was truncated due to
// insufficient ancillary buffer space.
template <typename T>
inline ArrayPtr<const T> asArray();
// Interpret the ancillary message as an array of items. If the message size does not evenly
// divide into elements of type T, the remainder is discarded -- this can happen if the message
// was truncated due to insufficient ancillary buffer space.
private:
int level;
int type;
ArrayPtr<const byte> data;
// Message data. In most cases you should use `as()` or `asArray()`.
};
class DatagramReceiver {
// Class encapsulating the recvmsg() system call. You must specify the DatagramReceiver's
// capacity in advance; if a received packet is larger than the capacity, it will be truncated.
public:
virtual Promise<void> receive() = 0;
// Receive a new message, overwriting this object's content.
//
// receive() may reuse the same buffers for content and ancillary data with each call.
template <typename T>
struct MaybeTruncated {
T value;
bool isTruncated;
// True if the Receiver's capacity was insufficient to receive the value and therefore the
// value is truncated.
};
virtual MaybeTruncated<ArrayPtr<const byte>> getContent() = 0;
// Get the content of the datagram.
virtual MaybeTruncated<ArrayPtr<const AncillaryMessage>> getAncillary() = 0;
// Ancilarry messages received with the datagram. See the recvmsg() system call and the cmsghdr
// struct. Most apps don't need this.
//
// If the returned value is truncated, then the last message in the array may itself be
// truncated, meaning its as<T>() method will return nullptr or its asArray<T>() method will
// return fewer elements than expected. Truncation can also mean that additional messages were
// available but discarded.
virtual NetworkAddress& getSource() = 0;
// Get the datagram sender's address.
struct Capacity {
size_t content = 8192;
// How much space to allocate for the datagram content. If a datagram is received that is
// larger than this, it will be truncated, with no way to recover the tail.
size_t ancillary = 0;
// How much space to allocate for ancillary messages. As with content, if the ancillary data
// is larger than this, it will be truncated.
};
};
class DatagramPort {
public:
virtual Promise<size_t> send(const void* buffer, size_t size, NetworkAddress& destination) = 0;
virtual Promise<size_t> send(ArrayPtr<const ArrayPtr<const byte>> pieces,
NetworkAddress& destination) = 0;
virtual Own<DatagramReceiver> makeReceiver(
DatagramReceiver::Capacity capacity = DatagramReceiver::Capacity()) = 0;
// Create a new `Receiver` that can be used to receive datagrams. `capacity` specifies how much
// space to allocate for the received message. The `DatagramPort` must outlive the `Receiver`.
virtual uint getPort() = 0;
// Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
// specify a port when constructing the NetworkAddress -- one will have been assigned
// automatically.
virtual void getsockopt(int level, int option, void* value, uint* length);
virtual void setsockopt(int level, int option, const void* value, uint length);
// Same as the methods of AsyncIoStream.
}; };
// =======================================================================================
// Networks
class NetworkAddress { class NetworkAddress {
// Represents a remote address to which the application can connect. // Represents a remote address to which the application can connect.
...@@ -87,6 +222,14 @@ public: ...@@ -87,6 +222,14 @@ public:
// //
// The address must be local. // The address must be local.
virtual Own<DatagramPort> bindDatagramPort();
// Open this address as a datagram (e.g. UDP) port.
//
// The address must be local.
virtual Own<NetworkAddress> clone() = 0;
// Returns an equivalent copy of this NetworkAddress.
virtual String toString() = 0; virtual String toString() = 0;
// Produce a human-readable string which hopefully can be passed to Network::parseRemoteAddress() // Produce a human-readable string which hopefully can be passed to Network::parseRemoteAddress()
// to reproduce this address, although whether or not that works of course depends on the Network // to reproduce this address, although whether or not that works of course depends on the Network
...@@ -123,26 +266,15 @@ public: ...@@ -123,26 +266,15 @@ public:
// `portHint`, if provided, specifies the "standard" IP port number for the application-level // `portHint`, if provided, specifies the "standard" IP port number for the application-level
// service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a // service in play. If the address turns out to be an IP address (v4 or v6), and it lacks a
// port number, this port will be used. If `addr` lacks a port number *and* `portHint` is // port number, this port will be used. If `addr` lacks a port number *and* `portHint` is
// omitted, then the returned address will only support listen() (not connect()), and a port // omitted, then the returned address will only support listen() and bindDatagramPort()
// will be chosen when listen() is called. // (not connect()), and an unused port will be chosen each time one of those methods is called.
virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0; virtual Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) = 0;
// Construct a network address from a legacy struct sockaddr. // Construct a network address from a legacy struct sockaddr.
}; };
struct OneWayPipe { // =======================================================================================
// A data pipe with an input end and an output end. (Typically backed by pipe() system call.) // I/O Provider
Own<AsyncInputStream> in;
Own<AsyncOutputStream> out;
};
struct TwoWayPipe {
// A data pipe that supports sending in both directions. Each end's output sends data to the
// other end's input. (Typically backed by socketpair() system call.)
Own<AsyncIoStream> ends[2];
};
class AsyncIoProvider { class AsyncIoProvider {
// Class which constructs asynchronous wrappers around the operating system's I/O facilities. // Class which constructs asynchronous wrappers around the operating system's I/O facilities.
...@@ -284,6 +416,8 @@ public: ...@@ -284,6 +416,8 @@ public:
// //
// `flags` is a bitwise-OR of the values of the `Flags` enum. // `flags` is a bitwise-OR of the values of the `Flags` enum.
virtual Own<DatagramPort> wrapDatagramSocketFd(int fd, uint flags = 0);
virtual Timer& getTimer() = 0; virtual Timer& getTimer() = 0;
// Returns a `Timer` based on real time. Time does not pass while event handlers are running -- // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
// it only updates when the event loop polls for system events. This means that calling `now()` // it only updates when the event loop polls for system events. This means that calling `now()`
...@@ -328,6 +462,30 @@ AsyncIoContext setupAsyncIo(); ...@@ -328,6 +462,30 @@ AsyncIoContext setupAsyncIo();
// return 0; // return 0;
// } // }
// =======================================================================================
// inline implementation details
inline AncillaryMessage::AncillaryMessage(
int level, int type, ArrayPtr<const byte> data)
: level(level), type(type), data(data) {}
inline int AncillaryMessage::getLevel() const { return level; }
inline int AncillaryMessage::getType() const { return type; }
template <typename T>
inline Maybe<const T&> AncillaryMessage::as() {
if (data.size() >= sizeof(T)) {
return *reinterpret_cast<const T*>(data.begin());
} else {
return nullptr;
}
}
template <typename T>
inline ArrayPtr<const T> AncillaryMessage::asArray() {
return arrayPtr(reinterpret_cast<const T*>(data.begin()), data.size() / sizeof(T));
}
} // namespace kj } // namespace kj
#endif // KJ_ASYNC_IO_H_ #endif // KJ_ASYNC_IO_H_
...@@ -797,6 +797,16 @@ public: ...@@ -797,6 +797,16 @@ public:
inline operator T*() { return isSet ? &value : nullptr; } inline operator T*() { return isSet ? &value : nullptr; }
inline operator const T*() const { return isSet ? &value : nullptr; } inline operator const T*() const { return isSet ? &value : nullptr; }
template <typename... Params>
inline void emplace(Params&&... params) {
if (isSet) {
isSet = false;
dtor(value);
}
ctor(value, kj::fwd<Params>(params)...);
isSet = true;
}
private: // internal interface used by friends only private: // internal interface used by friends only
inline NullableValue() noexcept: isSet(false) {} inline NullableValue() noexcept: isSet(false) {}
inline NullableValue(T&& t) noexcept(noexcept(T(instance<T&&>()))) inline NullableValue(T&& t) noexcept(noexcept(T(instance<T&&>())))
...@@ -959,6 +969,14 @@ public: ...@@ -959,6 +969,14 @@ public:
Maybe(decltype(nullptr)) noexcept: ptr(nullptr) {} Maybe(decltype(nullptr)) noexcept: ptr(nullptr) {}
template <typename... Params>
inline void emplace(Params&&... params) {
// Replace this Maybe's content with a new value constructed by passing the given parametrs to
// T's constructor. This can be used to initialize a Maybe without copying or even moving a T.
ptr.emplace(kj::fwd<Params>(params)...);
}
inline Maybe& operator=(Maybe&& other) { ptr = kj::mv(other.ptr); return *this; } inline Maybe& operator=(Maybe&& other) { ptr = kj::mv(other.ptr); return *this; }
inline Maybe& operator=(Maybe& other) { ptr = other.ptr; return *this; } inline Maybe& operator=(Maybe& other) { ptr = other.ptr; return *this; }
inline Maybe& operator=(const Maybe& other) { ptr = other.ptr; return *this; } inline Maybe& operator=(const Maybe& other) { ptr = other.ptr; return *this; }
......
...@@ -19,7 +19,7 @@ while [ $# -gt 0 ]; do ...@@ -19,7 +19,7 @@ while [ $# -gt 0 ]; do
caffeinate ) caffeinate )
# Re-run preventing sleep. # Re-run preventing sleep.
shift shift
exec caffeinate $0 $@ exec caffeinate -ims $0 $@
;; ;;
tmpdir ) tmpdir )
# Clone to a temp directory. # Clone to a temp directory.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment