Commit f0335e7b authored by Matthew Maurer's avatar Matthew Maurer

Add ReaderOptions configurability to EzRpc

Add a defaulted ReaderOptions argument to the constructors for
EzRpcServer and EzRpcClient to allow large/deep message uses more
easily.
parent 4023a354
......@@ -80,9 +80,9 @@ struct EzRpcClient::Impl {
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
ClientContext(kj::Own<kj::AsyncIoStream>&& stream)
ClientContext(kj::Own<kj::AsyncIoStream>&& stream, ReaderOptions readerOpts)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::CLIENT),
network(*this->stream, rpc::twoparty::Side::CLIENT, readerOpts),
rpcSystem(makeRpcClient(network)) {}
Capability::Client restore(kj::StringPtr name) {
......@@ -102,39 +102,44 @@ struct EzRpcClient::Impl {
kj::Maybe<kj::Own<ClientContext>> clientContext;
// Filled in before `setupPromise` resolves.
Impl(kj::StringPtr serverAddress, uint defaultPort)
Impl(kj::StringPtr serverAddress, uint defaultPort,
ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()),
setupPromise(context->getIoProvider().getNetwork()
.parseAddress(serverAddress, defaultPort)
.then([](kj::Own<kj::NetworkAddress>&& addr) {
.then([readerOpts](kj::Own<kj::NetworkAddress>&& addr) {
return addr->connect();
}).then([this](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream));
}).then([this, readerOpts](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream),
readerOpts);
}).fork()) {}
Impl(const struct sockaddr* serverAddress, uint addrSize)
Impl(const struct sockaddr* serverAddress, uint addrSize,
ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()),
setupPromise(context->getIoProvider().getNetwork()
.getSockaddr(serverAddress, addrSize)->connect()
.then([this](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream));
.then([this, readerOpts](kj::Own<kj::AsyncIoStream>&& stream) {
clientContext = kj::heap<ClientContext>(kj::mv(stream),
readerOpts);
}).fork()) {}
Impl(int socketFd)
Impl(int socketFd, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()),
setupPromise(kj::Promise<void>(kj::READY_NOW).fork()),
clientContext(kj::heap<ClientContext>(
context->getLowLevelIoProvider().wrapSocketFd(socketFd))) {}
context->getLowLevelIoProvider().wrapSocketFd(socketFd),
readerOpts)) {}
};
EzRpcClient::EzRpcClient(kj::StringPtr serverAddress, uint defaultPort)
: impl(kj::heap<Impl>(serverAddress, defaultPort)) {}
EzRpcClient::EzRpcClient(kj::StringPtr serverAddress, uint defaultPort, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(serverAddress, defaultPort, readerOpts)) {}
EzRpcClient::EzRpcClient(const struct sockaddr* serverAddress, uint addrSize)
: impl(kj::heap<Impl>(serverAddress, addrSize)) {}
EzRpcClient::EzRpcClient(const struct sockaddr* serverAddress, uint addrSize, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(serverAddress, addrSize, readerOpts)) {}
EzRpcClient::EzRpcClient(int socketFd)
: impl(kj::heap<Impl>(socketFd)) {}
EzRpcClient::EzRpcClient(int socketFd, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(socketFd, readerOpts)) {}
EzRpcClient::~EzRpcClient() noexcept(false) {}
......@@ -192,50 +197,51 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::SturdyRefHostId> rpcSystem;
ServerContext(kj::Own<kj::AsyncIoStream>&& stream, SturdyRefRestorer<Text>& restorer)
ServerContext(kj::Own<kj::AsyncIoStream>&& stream, SturdyRefRestorer<Text>& restorer,
ReaderOptions readerOpts)
: stream(kj::mv(stream)),
network(*this->stream, rpc::twoparty::Side::SERVER),
network(*this->stream, rpc::twoparty::Side::SERVER, readerOpts),
rpcSystem(makeRpcServer(network, restorer)) {}
};
Impl(kj::StringPtr bindAddress, uint defaultPort)
Impl(kj::StringPtr bindAddress, uint defaultPort, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto paf = kj::newPromiseAndFulfiller<uint>();
portPromise = paf.promise.fork();
tasks.add(context->getIoProvider().getNetwork().parseAddress(bindAddress, defaultPort)
.then(kj::mvCapture(paf.fulfiller,
[this](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) {
[this, readerOpts](kj::Own<kj::PromiseFulfiller<uint>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) {
auto listener = addr->listen();
portFulfiller->fulfill(listener->getPort());
acceptLoop(kj::mv(listener));
acceptLoop(kj::mv(listener), readerOpts);
})));
}
Impl(struct sockaddr* bindAddress, uint addrSize)
Impl(struct sockaddr* bindAddress, uint addrSize, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()), portPromise(nullptr), tasks(*this) {
auto listener = context->getIoProvider().getNetwork()
.getSockaddr(bindAddress, addrSize)->listen();
portPromise = kj::Promise<uint>(listener->getPort()).fork();
acceptLoop(kj::mv(listener));
acceptLoop(kj::mv(listener), readerOpts);
}
Impl(int socketFd, uint port)
Impl(int socketFd, uint port, ReaderOptions readerOpts)
: context(EzRpcContext::getThreadLocal()),
portPromise(kj::Promise<uint>(port).fork()),
tasks(*this) {
acceptLoop(context->getLowLevelIoProvider().wrapListenSocketFd(socketFd));
acceptLoop(context->getLowLevelIoProvider().wrapListenSocketFd(socketFd), readerOpts);
}
void acceptLoop(kj::Own<kj::ConnectionReceiver>&& listener) {
void acceptLoop(kj::Own<kj::ConnectionReceiver>&& listener, ReaderOptions readerOpts) {
auto ptr = listener.get();
tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
[this](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) {
acceptLoop(kj::mv(listener));
[this, readerOpts](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) {
acceptLoop(kj::mv(listener), readerOpts);
auto server = kj::heap<ServerContext>(kj::mv(connection), *this);
auto server = kj::heap<ServerContext>(kj::mv(connection), *this, readerOpts);
// Arrange to destroy the server context when all references are gone, or when the
// EzRpcServer is destroyed (which will destroy the TaskSet).
......@@ -258,14 +264,16 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
}
};
EzRpcServer::EzRpcServer(kj::StringPtr bindAddress, uint defaultPort)
: impl(kj::heap<Impl>(bindAddress, defaultPort)) {}
EzRpcServer::EzRpcServer(kj::StringPtr bindAddress, uint defaultPort,
ReaderOptions readerOpts)
: impl(kj::heap<Impl>(bindAddress, defaultPort, readerOpts)) {}
EzRpcServer::EzRpcServer(struct sockaddr* bindAddress, uint addrSize)
: impl(kj::heap<Impl>(bindAddress, addrSize)) {}
EzRpcServer::EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts)
: impl(kj::heap<Impl>(bindAddress, addrSize, readerOpts)) {}
EzRpcServer::EzRpcServer(int socketFd, uint port)
: impl(kj::heap<Impl>(socketFd, port)) {}
EzRpcServer::EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts)
: impl(kj::heap<Impl>(socketFd, port, readerOpts)) {}
EzRpcServer::~EzRpcServer() noexcept(false) {}
......
......@@ -23,6 +23,7 @@
#define CAPNP_EZ_RPC_H_
#include "rpc.h"
#include "message.h"
struct sockaddr;
......@@ -91,7 +92,8 @@ class EzRpcClient {
// - `TwoPartyVatNetwork` in `capnp/rpc-twoparty.h`.
public:
explicit EzRpcClient(kj::StringPtr serverAddress, uint defaultPort = 0);
explicit EzRpcClient(kj::StringPtr serverAddress, uint defaultPort = 0,
ReaderOptions readerOpts = ReaderOptions());
// Construct a new EzRpcClient and connect to the given address. The connection is formed in
// the background -- if it fails, calls to capabilities returned by importCap() will fail with an
// appropriate exception.
......@@ -101,13 +103,23 @@ public:
//
// The address is parsed by `kj::Network` in `kj/async-io.h`. See that interface for more info
// on the address format, but basically it's what you'd expect.
//
// `readerOpts` is the ReaderOptions structure used by capnproto to limit the traversal of
// messages. If not specified, a default value of 8M max word traversal and a nesting limit of 64
// deep.
// You should only need to set this if you are receiving errors about messages being too large or
// too deep in normal operation, and should consider restructuring your protocol to use simpler
// or smaller messages if this is an issue for you.
EzRpcClient(const struct sockaddr* serverAddress, uint addrSize);
EzRpcClient(const struct sockaddr* serverAddress, uint addrSize,
ReaderOptions readerOpts = ReaderOptions());
// Like the above constructor, but connects to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
explicit EzRpcClient(int socketFd);
explicit EzRpcClient(int socketFd, ReaderOptions readerOpts = ReaderOptions());
// Create a client on top of an already-connected socket.
// `readerOpts` acts as in the first constructor.
~EzRpcClient() noexcept(false);
......@@ -138,7 +150,8 @@ class EzRpcServer {
// The server counterpart to `EzRpcClient`. See `EzRpcClient` for an example.
public:
explicit EzRpcServer(kj::StringPtr bindAddress, uint defaultPort = 0);
explicit EzRpcServer(kj::StringPtr bindAddress, uint defaultPort = 0,
ReaderOptions readerOpts = ReaderOptions());
// Construct a new `EzRpcServer` that binds to the given address. An address of "*" means to
// bind to all local addresses.
//
......@@ -152,14 +165,23 @@ public:
// The server might not begin listening immediately, especially if `bindAddress` needs to be
// resolved. If you need to wait until the server is definitely up, wait on the promise returned
// by `getPort()`.
EzRpcServer(struct sockaddr* bindAddress, uint addrSize);
//
// `readerOpts` is the ReaderOptions structure used by capnproto to limit the traversal of
// messages. If not specified, a default value of 8M max word traversal and a nesting limit of 64
// deep.
// You should only need to set this if you are receiving errors about messages being too large or
// too deep in normal operation, and should consider restructuring your protocol to use simpler
// or smaller messages if this is an issue for you.
EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
ReaderOptions readerOpts = ReaderOptions());
// Like the above constructor, but binds to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
EzRpcServer(int socketFd, uint port);
EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts = ReaderOptions());
// Create a server on top of an already-listening socket (i.e. one on which accept() may be
// called). `port` is returned by `getPort()` -- it serves no other purpose.
// `readerOpts` acts as in the other two above constructors.
~EzRpcServer() noexcept(false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment