Commit 822d2399 authored by Kenton Varda's avatar Kenton Varda

Add simple convenience classes for using TwoPartyVatNetwork as client or server,…

Add simple convenience classes for using TwoPartyVatNetwork as client or server, since I find myself rewriting this code over and over.
parent 4bbc78b8
......@@ -308,6 +308,34 @@ TEST(TwoPartyNetwork, Abort) {
EXPECT_TRUE(conn->receiveIncomingMessage().wait(ioContext.waitScope) == nullptr);
}
TEST(TwoPartyNetwork, ConvenienceClasses) {
auto ioContext = kj::setupAsyncIo();
int callCount = 0;
TwoPartyServer server(kj::heap<TestInterfaceImpl>(callCount));
auto address = ioContext.provider->getNetwork()
.parseAddress("127.0.0.1").wait(ioContext.waitScope);
auto listener = address->listen();
auto listenPromise = server.listen(*listener);
address = ioContext.provider->getNetwork()
.parseAddress("127.0.0.1", listener->getPort()).wait(ioContext.waitScope);
auto connection = address->connect().wait(ioContext.waitScope);
TwoPartyClient client(*connection);
auto cap = client.bootstrap().castAs<test::TestInterface>();
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
EXPECT_EQ(0, callCount);
auto response = request.send().wait(ioContext.waitScope);
EXPECT_EQ("foo", response.getX());
EXPECT_EQ(1, callCount);
}
} // namespace
} // namespace _
} // namespace capnp
......@@ -141,4 +141,49 @@ kj::Promise<void> TwoPartyVatNetwork::shutdown() {
return kj::mv(result);
}
// =======================================================================================
TwoPartyServer::TwoPartyServer(Capability::Client bootstrapInterface)
: bootstrapInterface(kj::mv(bootstrapInterface)), tasks(*this) {}
struct TwoPartyServer::AcceptedConnection {
kj::Own<kj::AsyncIoStream> connection;
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::VatId> rpcSystem;
explicit AcceptedConnection(Capability::Client bootstrapInterface,
kj::Own<kj::AsyncIoStream>&& connectionParam)
: connection(kj::mv(connectionParam)),
network(*connection, rpc::twoparty::Side::SERVER),
rpcSystem(makeRpcServer(network, kj::mv(bootstrapInterface))) {}
};
kj::Promise<void> TwoPartyServer::listen(kj::ConnectionReceiver& listener) {
return listener.accept()
.then([this,&listener](kj::Own<kj::AsyncIoStream>&& connection) mutable {
auto connectionState = kj::heap<AcceptedConnection>(bootstrapInterface, kj::mv(connection));
// Run the connection until disconnect.
auto promise = connectionState->network.onDisconnect();
tasks.add(promise.attach(kj::mv(connectionState)));
return listen(listener);
});
}
void TwoPartyServer::taskFailed(kj::Exception&& exception) {
KJ_LOG(ERROR, exception);
}
TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection)
: network(connection, rpc::twoparty::Side::CLIENT),
rpcSystem(makeRpcClient(network)) {}
Capability::Client TwoPartyClient::bootstrap() {
MallocMessageBuilder message(4);
auto vatId = message.getRoot<rpc::twoparty::VatId>();
vatId.setSide(rpc::twoparty::Side::SERVER);
return rpcSystem.bootstrap(vatId);
}
} // namespace capnp
......@@ -54,6 +54,7 @@ class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
public:
TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
ReaderOptions receiveOptions = ReaderOptions());
KJ_DISALLOW_COPY(TwoPartyVatNetwork);
kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
// Returns a promise that resolves when the peer disconnects.
......@@ -107,6 +108,41 @@ private:
kj::Promise<void> shutdown() override;
};
class TwoPartyServer: private kj::TaskSet::ErrorHandler {
// Convenience class which implements a simple server which accepts connections on a listener
// socket and serices them as two-party connections.
public:
explicit TwoPartyServer(Capability::Client bootstrapInterface);
kj::Promise<void> listen(kj::ConnectionReceiver& listener);
// Listens for connections on the given listener. The returned promise never resolves unless an
// exception is thrown while trying to accept. You may discard the returned promise to cancel
// listening.
private:
Capability::Client bootstrapInterface;
kj::TaskSet tasks;
struct AcceptedConnection;
void taskFailed(kj::Exception&& exception) override;
};
class TwoPartyClient {
// Convenience class which implements a simple client.
public:
explicit TwoPartyClient(kj::AsyncIoStream& connection);
Capability::Client bootstrap();
// Get the server's bootstrap interface.
private:
TwoPartyVatNetwork network;
RpcSystem<rpc::twoparty::VatId> rpcSystem;
};
} // namespace capnp
#endif // CAPNP_RPC_TWOPARTY_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment