Commit ed03e100 authored by Kenton Varda's avatar Kenton Varda

EZ RPC interface for quickly setting up clients and servers.

parent 46475bd7
......@@ -169,10 +169,12 @@ includecapnp_HEADERS = \
src/capnp/serialize-packed.h \
src/capnp/pointer-helpers.h \
src/capnp/generated-header-support.h \
src/capnp/rpc-prelude.h \
src/capnp/rpc.h \
src/capnp/rpc-twoparty.h \
src/capnp/rpc.capnp.h \
src/capnp/rpc-twoparty.capnp.h
src/capnp/rpc-twoparty.capnp.h \
src/capnp/ez-rpc.h
lib_LTLIBRARIES = libkj.la libkj-async.la libcapnp.la libcapnp-rpc.la libcapnpc.la
......@@ -235,7 +237,8 @@ libcapnp_rpc_la_SOURCES= \
src/capnp/rpc.c++ \
src/capnp/rpc.capnp.c++ \
src/capnp/rpc-twoparty.c++ \
src/capnp/rpc-twoparty.capnp.c++
src/capnp/rpc-twoparty.capnp.c++ \
src/capnp/ez-rpc.c++
# -lpthread is here to work around https://bugzilla.redhat.com/show_bug.cgi?id=661333
libcapnpc_la_LIBADD = libcapnp.la libkj.la $(PTHREAD_LIBS) -lpthread
......@@ -368,6 +371,7 @@ capnp_test_SOURCES = \
src/capnp/serialize-packed-test.c++ \
src/capnp/rpc-test.c++ \
src/capnp/rpc-twoparty-test.c++ \
src/capnp/ez-rpc-test.c++ \
src/capnp/test-util.c++ \
src/capnp/test-util.h \
src/capnp/compiler/lexer-test.c++ \
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "ez-rpc.h"
#include "test-util.h"
#include <gtest/gtest.h>
namespace capnp {
namespace _ {
namespace {
TEST(EzRpc, Basic) {
EzRpcServer server("127.0.0.1");
int callCount = 0;
server.exportCap("cap1", kj::heap<TestInterfaceImpl>(callCount));
server.exportCap("cap2", kj::heap<TestCallOrderImpl>());
EzRpcClient client("127.0.0.1", server.getPort().wait());
auto cap = client.importCap<test::TestInterface>("cap1");
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
EXPECT_EQ(0, callCount);
auto response = request.send().wait();
EXPECT_EQ("foo", response.getX());
EXPECT_EQ(1, callCount);
EXPECT_EQ(0, client.importCap("cap2").castAs<test::TestCallOrder>()
.getCallSequenceRequest().send().wait().getN());
EXPECT_EQ(1, client.importCap("cap2").castAs<test::TestCallOrder>()
.getCallSequenceRequest().send().wait().getN());
}
} // namespace
} // namespace _
} // namespace capnp
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "ez-rpc.h"
#include "rpc-twoparty.h"
#include <capnp/rpc.capnp.h>
#include <kj/async-io.h>
#include <kj/debug.h>
#include <map>
namespace capnp {
static __thread EzRpcContext* threadEzContext = nullptr;
class EzRpcContext: public kj::Refcounted {
public:
EzRpcContext(): ioProvider(kj::setupIoEventLoop()) {
threadEzContext = this;
}
~EzRpcContext() noexcept(false) {
KJ_REQUIRE(threadEzContext == this,
"EzRpcContext destroyed from different thread than it was created.") {
return;
}
threadEzContext = nullptr;
}
kj::AsyncIoProvider& getIoProvider() {
return *ioProvider;
}
static kj::Own<EzRpcContext> getThreadLocal() {
EzRpcContext* existing = threadEzContext;
if (existing != nullptr) {
return kj::addRef(*existing);
} else {
return kj::refcounted<EzRpcContext>();
}
}
private:
kj::Own<kj::AsyncIoProvider> ioProvider;
};
// =======================================================================================
struct EzRpcClient::Impl {
kj::Own<EzRpcContext> context;
struct ClientContext {
kj::Own<kj::AsyncIoStream> stream;
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
ClientContext(kj::Own<kj::AsyncIoStream>&& stream)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::CLIENT),
rpcSystem(makeRpcClient(network)) {}
Capability::Client restore(kj::StringPtr name) {
word scratch[64];
memset(scratch, 0, sizeof(scratch));
MallocMessageBuilder message(scratch);
auto root = message.getRoot<rpc::SturdyRef>();
auto hostId = root.getHostId().getAs<rpc::twoparty::SturdyRefHostId>();
hostId.setSide(rpc::twoparty::Side::SERVER);
root.getObjectId().setAs<Text>(name);
return rpcSystem.restore(hostId, root.getObjectId());
}
};
kj::ForkedPromise<void> setupPromise;
kj::Maybe<kj::Own<ClientContext>> clientContext;
// Filled in before `setupPromise` resolves.
Impl(kj::StringPtr serverAddress, uint defaultPort)
: context(EzRpcContext::getThreadLocal()),
setupPromise(context->getIoProvider().getNetwork()
.parseRemoteAddress(serverAddress, defaultPort)
.then([](kj::Own<kj::RemoteAddress>&& addr) {
return addr->connect();
}).then([this](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream));
}).fork()) {}
Impl(struct sockaddr* serverAddress, uint addrSize)
: context(EzRpcContext::getThreadLocal()),
setupPromise(context->getIoProvider().getNetwork()
.getRemoteSockaddr(serverAddress, addrSize)->connect()
.then([this](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream));
}).fork()) {}
Impl(int socketFd)
: context(EzRpcContext::getThreadLocal()),
setupPromise(kj::Promise<void>(kj::READY_NOW).fork()),
clientContext(kj::heap<ClientContext>(context->getIoProvider().wrapSocketFd(socketFd))) {}
};
EzRpcClient::EzRpcClient(kj::StringPtr serverAddress, uint defaultPort)
: impl(kj::heap<Impl>(serverAddress, defaultPort)) {}
EzRpcClient::EzRpcClient(struct sockaddr* serverAddress, uint addrSize)
: impl(kj::heap<Impl>(serverAddress, addrSize)) {}
EzRpcClient::EzRpcClient(int socketFd)
: impl(kj::heap<Impl>(socketFd)) {}
EzRpcClient::~EzRpcClient() noexcept(false) {}
Capability::Client EzRpcClient::importCap(kj::StringPtr name) {
KJ_IF_MAYBE(client, impl->clientContext) {
return client->get()->restore(name);
} else {
return impl->setupPromise.addBranch().then(kj::mvCapture(kj::heapString(name),
[this](kj::String&& name) {
return KJ_ASSERT_NONNULL(impl->clientContext)->restore(name);
}));
}
}
kj::AsyncIoProvider& EzRpcClient::getIoProvider() {
return impl->context->getIoProvider();
}
// =======================================================================================
struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskSet::ErrorHandler {
kj::Own<EzRpcContext> context;
struct ExportedCap {
kj::String name;
Capability::Client cap = nullptr;
ExportedCap(kj::StringPtr name, Capability::Client cap)
: name(kj::heapString(name)), cap(cap) {}
ExportedCap() = default;
ExportedCap(const ExportedCap&) = delete;
ExportedCap(ExportedCap&&) = default;
ExportedCap& operator=(const ExportedCap&) = delete;
ExportedCap& operator=(ExportedCap&&) = default;
// Make std::map happy...
};
std::map<kj::StringPtr, ExportedCap> exportMap;
kj::ForkedPromise<uint> portPromise;
kj::TaskSet tasks;
struct ServerContext {
kj::Own<kj::AsyncIoStream> stream;
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
ServerContext(kj::Own<kj::AsyncIoStream>&& stream, SturdyRefRestorer<Text>& restorer)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::SERVER),
rpcSystem(makeRpcServer(network, restorer)) {}
};
Impl(kj::StringPtr bindAddress, uint defaultPort)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto paf = kj::newPromiseAndFulfiller<uint>();
portPromise = paf.promise.fork();
tasks.add(context->getIoProvider().getNetwork().parseLocalAddress(bindAddress, defaultPort)
.then(kj::mvCapture(paf.fulfiller,
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller,
kj::Own<kj::LocalAddress>&& addr) {
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptLoop(kj::mv(listener));
})));
}
Impl(struct sockaddr* bindAddress, uint addrSize)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto listener = context->getIoProvider().getNetwork()
.getLocalSockaddr(bindAddress, addrSize)->listen();
portPromise = kj::Promise<uint>(listener->getPort()).fork();
acceptLoop(kj::mv(listener));
}
Impl(int socketFd, uint port)
: context(EzRpcContext::getThreadLocal()),
portPromise(kj::Promise<uint>(port).fork()),
tasks(*this) {
acceptLoop(context->getIoProvider().wrapListenSocketFd(socketFd));
}
void acceptLoop(kj::Own<kj::ConnectionReceiver>&& listener) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) {
acceptLoop(kj::mv(listener));
auto server = kj::heap<ServerContext>(kj::mv(connection), *this);
// Arrange to destroy the server context when all references are gone, or when the
// EzRpcServer is destroyed (which will destroy the TaskSet).
auto promise = server->network.onDrained();
promise.attach(kj::mv(server));
tasks.add(kj::mv(promise));
})));
}
Capability::Client restore(Text::Reader name) override {
auto iter = exportMap.find(name);
if (iter == exportMap.end()) {
KJ_FAIL_REQUIRE("Server exports no such capability.", name) { break; }
return nullptr;
} else {
return iter->second.cap;
}
}
void taskFailed(kj::Exception&& exception) override {
kj::throwFatalException(kj::mv(exception));
}
};
EzRpcServer::EzRpcServer(kj::StringPtr bindAddress, uint defaultPort)
: impl(kj::heap<Impl>(bindAddress, defaultPort)) {}
EzRpcServer::EzRpcServer(struct sockaddr* bindAddress, uint addrSize)
: impl(kj::heap<Impl>(bindAddress, addrSize)) {}
EzRpcServer::EzRpcServer(int socketFd, uint port)
: impl(kj::heap<Impl>(socketFd, port)) {}
EzRpcServer::~EzRpcServer() noexcept(false) {}
void EzRpcServer::exportCap(kj::StringPtr name, Capability::Client cap) {
Impl::ExportedCap entry(kj::heapString(name), cap);
impl->exportMap[entry.name] = kj::mv(entry);
}
kj::Promise<uint> EzRpcServer::getPort() {
return impl->portPromise.addBranch();
}
kj::AsyncIoProvider& EzRpcServer::getIoProvider() {
return impl->context->getIoProvider();
}
} // namespace capnp
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNP_EZ_RPC_H_
#define CAPNP_EZ_RPC_H_
#include "rpc.h"
namespace kj { class AsyncIoProvider; }
namespace capnp {
class EzRpcContext;
class EzRpcClient {
// Super-simple interface for setting up a Cap'n Proto RPC client. Example:
//
// # Cap'n Proto schema
// interface Adder {
// add @0 (left :Int32, right :Int32) -> (result :Int32);
// }
//
// // C++ client
// int main() {
// EzRpcClient client("10.0.1.2:3456");
// Adder::Client adder = client.getCap<Adder>("adder");
// auto request = adder.frobRequest();
// request.setLeft(12);
// request.setRight(34);
// auto response = request.wait();
// assert(response.getResult() == 46);
// return 0;
// }
//
// // C++ server
// class AdderImpl: public Adder::Server {
// public:
// kj::Promise<void> frob(FrobParams::Reader params, FrobResults::Builder results) {
// results.setResult(params.getLeft() + params.getRight());
// return kj::READY_NOW;
// }
// };
//
// int main() {
// EzRpcServer server(":3456");
// server.exportCap("adder", kj::heap<AdderImpl>());
// kj::NEVER_DONE.wait();
// }
//
// This interface is easy, but it hides a lot of useful features available from the lower-level
// classes:
// - The server can only export a small set of public, singleton capabilities under well-known
// string names. This is fine for transient services where no state needs to be kept between
// connections, but hides the power of Cap'n Proto when it comes to long-lived resources.
// - EzRpcClient/EzRpcServer automatically set up a `kj::EventLoop`. Only one `kj::EventLoop`
// can exist per thread, so you cannot use these interfaces if you wish to set up your own
// event loop. (However, you can safely create multiple EzRpcClient / EzRpcServer objects
// in a single thread; they will make sure to make no more than one EventLoop.)
// - These classes only support simple two-party connections, not multilateral VatNetworks.
public:
explicit EzRpcClient(kj::StringPtr serverAddress, uint defaultPort = 0);
// Construct a new EzRpcClient and connect to the given address. The connection is formed in
// the background -- if it fails, calls to capabilities returned by importCap() will fail with an
// appropriate exception.
//
// `defaultPort` is the IP port number to use if `serverAddress` does not include it explicitly.
// If unspecified, the port is required in `serverAddress`.
//
// The address is parsed by `kj::Network` in `kj/async-io.h`. See that interface for more info
// on the address format, but basically it's what you'd expect.
EzRpcClient(struct sockaddr* serverAddress, uint addrSize);
// Like the above constructor, but connects to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
explicit EzRpcClient(int socketFd);
// Create a client on top of an already-connected socket.
~EzRpcClient() noexcept(false);
template <typename Type>
typename Type::Client importCap(kj::StringPtr name);
Capability::Client importCap(kj::StringPtr name);
// Ask the sever for the capability with the given name. You may specify a type to automatically
// down-cast to that type. It is up to you to specify the correct expected type.
kj::AsyncIoProvider& getIoProvider();
// Get the underlying AsyncIoProvider set up by the RPC system. This is useful if you want
// to do some non-RPC I/O in asynchronous fashion.
private:
struct Impl;
kj::Own<Impl> impl;
};
class EzRpcServer {
// The server counterpart to `EzRpcClient`. See `EzRpcClient` for an example.
public:
explicit EzRpcServer(kj::StringPtr bindAddress, uint deafultPort = 0);
// Construct a new `EzRpcServer` that binds to the given address. An address of "*" means to
// bind to all local addresses.
//
// `defaultPort` is the IP port number to use if `serverAddress` does not include it explicitly.
// If unspecified, a port is chosen automatically, and you must call getPort() to find out what
// it is.
//
// The address is parsed by `kj::Network` in `kj/async-io.h`. See that interface for more info
// on the address format, but basically it's what you'd expect.
//
// The server might not begin listening immediately, especially if `bindAddress` needs to be
// resolved. If you need to wait until the server is definitely up, wait on the promise returned
// by `getPort()`.
EzRpcServer(struct sockaddr* bindAddress, uint addrSize);
// Like the above constructor, but binds to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
EzRpcServer(int socketFd, uint port);
// Create a server on top of an already-listening socket (i.e. one on which accept() may be
// called). `port` is returned by `getPort()` -- it serves no other purpose.
~EzRpcServer() noexcept(false);
void exportCap(kj::StringPtr name, Capability::Client cap);
// Export a capability publicly under the given name, so that clients can import it.
//
// Keep in mind that you can implicitly convert `kj::Own<MyType::Server>&&` to
// `Capability::Client`, so it's typicall to pass something like
// `kj::heap<MyImplementation>(<constructor params>)` as the second parameter.
kj::Promise<uint> getPort();
// Get the IP port number on which this server is listening. This promise won't resolve until
// the server is actually listening. If the address was not an IP address (e.g. it was a Unix
// domain socket) then getPort() resolves to zero.
kj::AsyncIoProvider& getIoProvider();
// Get the underlying AsyncIoProvider set up by the RPC system. This is useful if you want
// to do some non-RPC I/O in asynchronous fashion.
private:
struct Impl;
kj::Own<Impl> impl;
};
// =======================================================================================
// inline implementation details
template <typename Type>
inline typename Type::Client EzRpcClient::importCap(kj::StringPtr name) {
return importCap(name).castAs<Type>();
}
} // namespace capnp
#endif // CAPNP_EZ_RPC_H_
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// This file contains a bunch of internal declarations that must appear before rpc.h can start.
// We don't define these directly in rpc.h because it makes the file hard to read.
#ifndef CAPNP_RPC_PRELUDE_H_
#define CAPNP_RPC_PRELUDE_H_
#include "capability.h"
namespace capnp {
class OutgoingRpcMessage;
class IncomingRpcMessage;
template <typename SturdyRefHostId>
class RpcSystem;
namespace _ { // private
class VatNetworkBase {
// Non-template version of VatNetwork. Ignore this class; see VatNetwork, below.
public:
class Connection;
struct ConnectionAndProvisionId {
kj::Own<Connection> connection;
kj::Own<OutgoingRpcMessage> firstMessage;
Orphan<ObjectPointer> provisionId;
};
class Connection {
public:
virtual kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) = 0;
virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0;
virtual void baseIntroduceTo(Connection& recipient,
ObjectPointer::Builder sendToRecipient,
ObjectPointer::Builder sendToTarget) = 0;
virtual ConnectionAndProvisionId baseConnectToIntroduced(
ObjectPointer::Reader capId) = 0;
virtual kj::Own<Connection> baseAcceptIntroducedConnection(
ObjectPointer::Reader recipientId) = 0;
};
virtual kj::Maybe<kj::Own<Connection>> baseConnectToRefHost(_::StructReader hostId) = 0;
virtual kj::Promise<kj::Own<Connection>> baseAcceptConnectionAsRefHost() = 0;
};
class SturdyRefRestorerBase {
public:
virtual Capability::Client baseRestore(ObjectPointer::Reader ref) = 0;
};
class RpcSystemBase {
public:
RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer);
RpcSystemBase(RpcSystemBase&& other) noexcept;
~RpcSystemBase() noexcept(false);
private:
class Impl;
kj::Own<Impl> impl;
Capability::Client baseRestore(_::StructReader hostId, ObjectPointer::Reader objectId);
// TODO(someday): Maybe define a public API called `TypelessStruct` so we don't have to rely
// on `_::StructReader` here?
template <typename>
friend class capnp::RpcSystem;
};
} // namespace _ (private)
} // namespace capnp
#endif // CAPNP_RPC_PRELUDE_H_
......@@ -37,6 +37,12 @@ typedef VatNetwork<rpc::twoparty::SturdyRefHostId, rpc::twoparty::ProvisionId,
class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
private TwoPartyVatNetworkBase::Connection {
// A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
// stream. This is used to implement the common case of a client/server network.
//
// See `ez-rpc.h` for a simple interface for setting up two-party clients and servers.
// Use `TwoPartyVatNetwork` only if you need the advanced features.
public:
TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
ReaderOptions receiveOptions = ReaderOptions());
......
This diff is collapsed.
......@@ -512,25 +512,45 @@ public:
Promise<Own<AsyncIoStream>> accept() override {
int newFd;
retry:
#if __linux__
KJ_NONBLOCKING_SYSCALL(newFd = ::accept4(fd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC)) {
// error
return nullptr;
}
newFd = ::accept4(fd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
KJ_NONBLOCKING_SYSCALL(newFd = ::accept(fd, nullptr, nullptr)) {
// error
return nullptr;
}
newFd = ::accept(fd, nullptr, nullptr);
#endif
if (newFd < 0) {
// Gotta wait.
return eventPort.onFdEvent(fd, POLLIN).then([this](short) {
return accept();
});
} else {
if (newFd >= 0) {
return Own<AsyncIoStream>(heap<Socket>(eventPort, newFd));
} else {
int error = errno;
switch (error) {
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK:
#endif
// Not ready yet.
return eventPort.onFdEvent(fd, POLLIN).then([this](short) {
return accept();
});
case EINTR:
case ENETDOWN:
case EPROTO:
case ENOPROTOOPT:
case EHOSTDOWN:
case ENONET:
case EHOSTUNREACH:
case EOPNOTSUPP:
case ENETUNREACH:
case ECONNABORTED:
// The incoming connection is dead-on-arrival. Just ignore it.
goto retry;
default:
KJ_FAIL_SYSCALL("accept", error);
}
}
}
......@@ -699,6 +719,10 @@ public:
setNonblocking(fd);
return heap<AsyncStreamFd>(eventPort, fd, fd);
}
Own<ConnectionReceiver> wrapListenSocketFd(int fd) override {
setNonblocking(fd);
return heap<FdConnectionReceiver>(eventPort, fd);
}
private:
UnixEventPort eventPort;
......
......@@ -217,6 +217,14 @@ public:
//
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
virtual Own<ConnectionReceiver> wrapListenSocketFd(int fd) = 0;
// Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
// have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
//
// Does not take ownership of the descriptor.
//
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
// ---------------------------------------------------------------------------
// Windows-only methods
......
......@@ -240,6 +240,9 @@ EventLoop::~EventLoop() noexcept(false) {
}
void EventLoop::run(uint maxTurnCount) {
running = true;
KJ_DEFER(running = false);
for (uint i = 0; i < maxTurnCount; i++) {
if (!turn()) {
break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment