Commit 5de44367 authored by Kenton Varda's avatar Kenton Varda

Misc tweaks.

parent b365bc9b
...@@ -181,7 +181,7 @@ public: ...@@ -181,7 +181,7 @@ public:
class BrokenClient final: public ClientHook, public kj::Refcounted { class BrokenClient final: public ClientHook, public kj::Refcounted {
public: public:
BrokenClient(const kj::Exception& exception): exception(exception) {} BrokenClient(const kj::Exception& exception): exception(exception) {}
BrokenClient(const char* description) BrokenClient(const kj::StringPtr description)
: exception(kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT, : exception(kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
"", 0, kj::str(description)) {} "", 0, kj::str(description)) {}
...@@ -220,7 +220,7 @@ kj::Own<const ClientHook> BrokenPipeline::getPipelinedCap( ...@@ -220,7 +220,7 @@ kj::Own<const ClientHook> BrokenPipeline::getPipelinedCap(
} // namespace } // namespace
kj::Own<const ClientHook> newBrokenCap(const char* reason) { kj::Own<const ClientHook> newBrokenCap(kj::StringPtr reason) {
return kj::refcounted<BrokenClient>(reason); return kj::refcounted<BrokenClient>(reason);
} }
......
...@@ -207,7 +207,7 @@ private: ...@@ -207,7 +207,7 @@ private:
void dropCap(_::LocalCapDescriptor::Reader descriptor) const override; void dropCap(_::LocalCapDescriptor::Reader descriptor) const override;
}; };
kj::Own<const ClientHook> newBrokenCap(const char* reason); kj::Own<const ClientHook> newBrokenCap(kj::StringPtr reason);
kj::Own<const ClientHook> newBrokenCap(kj::Exception&& reason); kj::Own<const ClientHook> newBrokenCap(kj::Exception&& reason);
// Helper function that creates a capability which simply throws exceptions when called. // Helper function that creates a capability which simply throws exceptions when called.
......
...@@ -40,7 +40,7 @@ TEST(AsyncIo, SimpleNetwork) { ...@@ -40,7 +40,7 @@ TEST(AsyncIo, SimpleNetwork) {
UnixEventLoop loop; UnixEventLoop loop;
DummyErrorHandler dummyHandler; DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler); TaskSet tasks(loop, dummyHandler);
auto& network = getOperatingSystemSingleton().getNetwork(); auto network = Network::newSystemNetwork();
Own<ConnectionReceiver> listener; Own<ConnectionReceiver> listener;
Own<AsyncIoStream> server; Own<AsyncIoStream> server;
...@@ -53,7 +53,7 @@ TEST(AsyncIo, SimpleNetwork) { ...@@ -53,7 +53,7 @@ TEST(AsyncIo, SimpleNetwork) {
tasks.add(loop.evalLater([&]() { tasks.add(loop.evalLater([&]() {
return port.promise return port.promise
.then([&](uint portnum) { .then([&](uint portnum) {
return network.parseRemoteAddress("127.0.0.1", portnum); return network->parseRemoteAddress("127.0.0.1", portnum);
}).then([&](Own<RemoteAddress>&& result) { }).then([&](Own<RemoteAddress>&& result) {
return result->connect(); return result->connect();
}).then([&](Own<AsyncIoStream>&& result) { }).then([&](Own<AsyncIoStream>&& result) {
...@@ -63,7 +63,7 @@ TEST(AsyncIo, SimpleNetwork) { ...@@ -63,7 +63,7 @@ TEST(AsyncIo, SimpleNetwork) {
})); }));
kj::String result = loop.wait(loop.evalLater([&]() { kj::String result = loop.wait(loop.evalLater([&]() {
return network.parseLocalAddress("*") return network->parseLocalAddress("*")
.then([&](Own<LocalAddress>&& result) { .then([&](Own<LocalAddress>&& result) {
listener = result->listen(); listener = result->listen();
port.fulfiller->fulfill(listener->getPort()); port.fulfiller->fulfill(listener->getPort());
...@@ -94,18 +94,18 @@ String tryParseRemote(EventLoop& loop, Network& network, StringPtr text, uint po ...@@ -94,18 +94,18 @@ String tryParseRemote(EventLoop& loop, Network& network, StringPtr text, uint po
TEST(AsyncIo, AddressParsing) { TEST(AsyncIo, AddressParsing) {
UnixEventLoop loop; UnixEventLoop loop;
auto& network = getOperatingSystemSingleton().getNetwork(); auto network = Network::newSystemNetwork();
EXPECT_EQ("*:0", tryParseLocal(loop, network, "*")); EXPECT_EQ("*:0", tryParseLocal(loop, *network, "*"));
EXPECT_EQ("*:123", tryParseLocal(loop, network, "123")); EXPECT_EQ("*:123", tryParseLocal(loop, *network, "123"));
EXPECT_EQ("*:123", tryParseLocal(loop, network, ":123")); EXPECT_EQ("*:123", tryParseLocal(loop, *network, ":123"));
EXPECT_EQ("[::]:123", tryParseLocal(loop, network, "0::0", 123)); EXPECT_EQ("[::]:123", tryParseLocal(loop, *network, "0::0", 123));
EXPECT_EQ("0.0.0.0:0", tryParseLocal(loop, network, "0.0.0.0")); EXPECT_EQ("0.0.0.0:0", tryParseLocal(loop, *network, "0.0.0.0"));
EXPECT_EQ("1.2.3.4:5678", tryParseRemote(loop, network, "1.2.3.4", 5678)); EXPECT_EQ("1.2.3.4:5678", tryParseRemote(loop, *network, "1.2.3.4", 5678));
EXPECT_EQ("[12ab:cd::34]:321", tryParseRemote(loop, network, "[12ab:cd:0::0:34]:321", 432)); EXPECT_EQ("[12ab:cd::34]:321", tryParseRemote(loop, *network, "[12ab:cd:0::0:34]:321", 432));
EXPECT_EQ("unix:foo/bar/baz", tryParseLocal(loop, network, "unix:foo/bar/baz")); EXPECT_EQ("unix:foo/bar/baz", tryParseLocal(loop, *network, "unix:foo/bar/baz"));
EXPECT_EQ("unix:foo/bar/baz", tryParseRemote(loop, network, "unix:foo/bar/baz")); EXPECT_EQ("unix:foo/bar/baz", tryParseRemote(loop, *network, "unix:foo/bar/baz"));
} }
TEST(AsyncIo, OneWayPipe) { TEST(AsyncIo, OneWayPipe) {
......
...@@ -582,51 +582,27 @@ private: ...@@ -582,51 +582,27 @@ private:
class SocketNetwork final: public Network { class SocketNetwork final: public Network {
public: public:
Promise<Own<LocalAddress>> parseLocalAddress(StringPtr addr, uint portHint = 0) override { Promise<Own<LocalAddress>> parseLocalAddress(StringPtr addr, uint portHint = 0) const override {
return EventLoop::current().evalLater(mvCapture(heapString(addr), return EventLoop::current().evalLater(mvCapture(heapString(addr),
[portHint](String&& addr) -> Own<LocalAddress> { [portHint](String&& addr) -> Own<LocalAddress> {
return heap<LocalSocketAddress>(SocketAddress::parseLocal(addr, portHint)); return heap<LocalSocketAddress>(SocketAddress::parseLocal(addr, portHint));
})); }));
} }
Promise<Own<RemoteAddress>> parseRemoteAddress(StringPtr addr, uint portHint = 0) override { Promise<Own<RemoteAddress>> parseRemoteAddress(StringPtr addr, uint portHint = 0) const override {
return EventLoop::current().evalLater(mvCapture(heapString(addr), return EventLoop::current().evalLater(mvCapture(heapString(addr),
[portHint](String&& addr) -> Own<RemoteAddress> { [portHint](String&& addr) -> Own<RemoteAddress> {
return heap<RemoteSocketAddress>(SocketAddress::parse(addr, portHint)); return heap<RemoteSocketAddress>(SocketAddress::parse(addr, portHint));
})); }));
} }
Own<LocalAddress> getLocalSockaddr(const void* sockaddr, uint len) override { Own<LocalAddress> getLocalSockaddr(const void* sockaddr, uint len) const override {
return Own<LocalAddress>(heap<LocalSocketAddress>(SocketAddress(sockaddr, len))); return Own<LocalAddress>(heap<LocalSocketAddress>(SocketAddress(sockaddr, len)));
} }
Own<RemoteAddress> getRemoteSockaddr(const void* sockaddr, uint len) override { Own<RemoteAddress> getRemoteSockaddr(const void* sockaddr, uint len) const override {
return Own<RemoteAddress>(heap<RemoteSocketAddress>(SocketAddress(sockaddr, len))); return Own<RemoteAddress>(heap<RemoteSocketAddress>(SocketAddress(sockaddr, len)));
} }
}; };
class UnixKernel: public OperatingSystem {
public:
UnixKernel()
: standardIo(STDIN_FILENO, STDOUT_FILENO),
standardError(-1, STDERR_FILENO) {}
AsyncIoStream& getStandardIo() override {
return standardIo;
}
AsyncOutputStream& getStandardError() override {
return standardError;
}
Network& getNetwork() override {
return network;
}
private:
AsyncStreamFd standardIo;
AsyncStreamFd standardError;
SocketNetwork network;
};
} // namespace } // namespace
Promise<void> AsyncInputStream::read(void* buffer, size_t bytes) { Promise<void> AsyncInputStream::read(void* buffer, size_t bytes) {
...@@ -648,6 +624,10 @@ Own<AsyncIoStream> AsyncIoStream::wrapFd(int fd) { ...@@ -648,6 +624,10 @@ Own<AsyncIoStream> AsyncIoStream::wrapFd(int fd) {
return heap<AsyncStreamFd>(fd, fd); return heap<AsyncStreamFd>(fd, fd);
} }
Own<Network> Network::newSystemNetwork() {
return heap<SocketNetwork>();
}
OneWayPipe newOneWayPipe() { OneWayPipe newOneWayPipe() {
int fds[2]; int fds[2];
#if __linux__ #if __linux__
...@@ -668,9 +648,4 @@ TwoWayPipe newTwoWayPipe() { ...@@ -668,9 +648,4 @@ TwoWayPipe newTwoWayPipe() {
return TwoWayPipe { { heap<Socket>(fds[0]), heap<Socket>(fds[1]) } }; return TwoWayPipe { { heap<Socket>(fds[0]), heap<Socket>(fds[1]) } };
} }
OperatingSystem& getOperatingSystemSingleton() {
static UnixKernel os;
return os;
}
} // namespace kj } // namespace kj
...@@ -41,6 +41,8 @@ public: ...@@ -41,6 +41,8 @@ public:
// Create an AsyncInputStream wrapping a file descriptor. // Create an AsyncInputStream wrapping a file descriptor.
// //
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already. // This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
//
// The returned object can only be called from within a system event loop (e.g. `UnixEventLoop`).
}; };
class AsyncOutputStream { class AsyncOutputStream {
...@@ -54,6 +56,8 @@ public: ...@@ -54,6 +56,8 @@ public:
// Create an AsyncOutputStream wrapping a file descriptor. // Create an AsyncOutputStream wrapping a file descriptor.
// //
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already. // This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
//
// The returned object can only be called from within a system event loop (e.g. `UnixEventLoop`).
}; };
class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream { class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
...@@ -64,6 +68,8 @@ public: ...@@ -64,6 +68,8 @@ public:
// Create an AsyncIoStream wrapping a file descriptor. // Create an AsyncIoStream wrapping a file descriptor.
// //
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already. // This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
//
// The returned object can only be called from within a system event loop (e.g. `UnixEventLoop`).
}; };
class ConnectionReceiver { class ConnectionReceiver {
...@@ -115,8 +121,33 @@ class Network { ...@@ -115,8 +121,33 @@ class Network {
// LocalAddress and/or RemoteAddress instances directly and work from there, if at all possible. // LocalAddress and/or RemoteAddress instances directly and work from there, if at all possible.
public: public:
virtual Promise<Own<LocalAddress>> parseLocalAddress(StringPtr addr, uint portHint = 0) = 0; static Own<Network> newSystemNetwork();
virtual Promise<Own<RemoteAddress>> parseRemoteAddress(StringPtr addr, uint portHint = 0) = 0; // Creates a new `Network` instance representing the networks exposed by the operating system.
//
// DO NOT CALL THIS except at the highest levels of your code, ideally in the main() function. If
// you call this from low-level code, then you are preventing higher-level code from injecting an
// alternative implementation. Instead, if your code needs to use network functionality, it
// should ask for a `Network` as a constructor or method parameter, so that higher-level code can
// chose what implementation to use. The system network is essentially a singleton. See:
// http://www.object-oriented-security.org/lets-argue/singletons
//
// Code that uses the system network should not make any assumptions about what kinds of
// addresses it will parse, as this could differ across platforms. String addresses should come
// strictly from the user, who will know how to write them correctly for their system.
//
// With that said, KJ currently supports the following string address formats:
// - IPv4: "1.2.3.4", "1.2.3.4:80"
// - IPv6: "1234:5678::abcd", "[1234:5678::abcd]:80"
// - Local IP wildcard (local addresses only; covers both v4 and v6): "*", "*:80", ":80", "80"
// - Unix domain: "unix:/path/to/socket"
//
// The system network -- and all objects it creates -- can only be used from threads running
// a system event loop (e.g. `UnixEventLoop`).
virtual Promise<Own<LocalAddress>> parseLocalAddress(
StringPtr addr, uint portHint = 0) const = 0;
virtual Promise<Own<RemoteAddress>> parseRemoteAddress(
StringPtr addr, uint portHint = 0) const = 0;
// Construct a local or remote address from a user-provided string. The format of the address // Construct a local or remote address from a user-provided string. The format of the address
// strings is not specified at the API level, and application code should make no assumptions // strings is not specified at the API level, and application code should make no assumptions
// about them. These strings should always be provided by humans, and said humans will know // about them. These strings should always be provided by humans, and said humans will know
...@@ -129,27 +160,11 @@ public: ...@@ -129,27 +160,11 @@ public:
// In practice, a local address is usually just a port number (or even an empty string, if a // In practice, a local address is usually just a port number (or even an empty string, if a
// reasonable `portHint` is provided), whereas a remote address usually requires a hostname. // reasonable `portHint` is provided), whereas a remote address usually requires a hostname.
virtual Own<LocalAddress> getLocalSockaddr(const void* sockaddr, uint len) = 0; virtual Own<LocalAddress> getLocalSockaddr(const void* sockaddr, uint len) const = 0;
virtual Own<RemoteAddress> getRemoteSockaddr(const void* sockaddr, uint len) = 0; virtual Own<RemoteAddress> getRemoteSockaddr(const void* sockaddr, uint len) const = 0;
// Construct a local or remote address from a legacy struct sockaddr. // Construct a local or remote address from a legacy struct sockaddr.
}; };
class OperatingSystem {
// Interface representing the I/O facilities offered to a process by the operating system. This
// interface usually should be used only in the highest levels of the application, in order to
// set up the right connections to pass down to lower levels that do the actual work.
public:
virtual AsyncIoStream& getStandardIo() = 0;
virtual AsyncOutputStream& getStandardError() = 0;
virtual Network& getNetwork() = 0;
// TODO(someday): Filesystem. Should it even be async?
// virtual Directory& getCurrentDir() = 0;
// virtual Directory& getRootDir() = 0;
};
struct OneWayPipe { struct OneWayPipe {
Own<AsyncInputStream> in; Own<AsyncInputStream> in;
Own<AsyncOutputStream> out; Own<AsyncOutputStream> out;
...@@ -165,18 +180,6 @@ TwoWayPipe newTwoWayPipe(); ...@@ -165,18 +180,6 @@ TwoWayPipe newTwoWayPipe();
// Creates two AsyncIoStreams representing the two ends of a two-way OS pipe (created with // Creates two AsyncIoStreams representing the two ends of a two-way OS pipe (created with
// socketpair(2)). Data written to one end can be read from the other. // socketpair(2)). Data written to one end can be read from the other.
OperatingSystem& getOperatingSystemSingleton();
// Get the EVIL singleton instance of OperatingSystem representing the real kernel.
//
// DO NOT USE THIS except at the highest levels of your code, ideally in the main() function. If
// you call this from low-level code, then you are preventing higher-level code from injecting an
// alternative implementation. Instead, if your code needs to use OS functionality, it should ask
// for an OperatingSystem as a parameter. See:
// http://www.object-oriented-security.org/lets-argue/singletons
//
// If you use KJ_MAIN, you never have to call this at all, because your main function will receive
// an OperatingSystem as part of the process context.
} // namespace kj } // namespace kj
#endif // KJ_ASYNC_IO_H_ #endif // KJ_ASYNC_IO_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment