Commit 490a7a2a authored by Kenton Varda's avatar Kenton Varda

Implement two-party network. The first RPC call over a socket took place at…

Implement two-party network.  The first RPC call over a socket took place at 2013-11-08 14:46:43 -0800 and completed successfully.
parent ecab2520
......@@ -7,4 +7,4 @@ export PATH=$PWD/bin:$PWD:$PATH
capnp compile -Isrc --no-standard-import --src-prefix=src -oc++:src \
src/capnp/c++.capnp src/capnp/schema.capnp \
src/capnp/compiler/lexer.capnp src/capnp/compiler/grammar.capnp \
src/capnp/rpc.capnp
src/capnp/rpc.capnp src/capnp/rpc-twoparty.capnp
......@@ -53,7 +53,8 @@ fi
mkdir -p tmp/capnp/bootstrap-test-tmp
INPUTS="capnp/c++.capnp capnp/schema.capnp capnp/compiler/lexer.capnp capnp/compiler/grammar.capnp"
INPUTS="capnp/c++.capnp capnp/schema.capnp capnp/compiler/lexer.capnp capnp/compiler/grammar.capnp \
capnp/rpc.capnp capnp/rpc-twoparty.capnp"
SRC_INPUTS=""
for file in $INPUTS; do
......
......@@ -50,35 +50,6 @@ namespace {
#define EXPECT_NONFATAL_FAILURE EXPECT_ANY_THROW
#endif
class TestInterfaceImpl final: public test::TestInterface::Server {
public:
TestInterfaceImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override {
++callCount;
EXPECT_EQ(123, params.getI());
EXPECT_TRUE(params.getJ());
result.setX("foo");
return kj::READY_NOW;
}
::kj::Promise<void> bazAdvanced(
::capnp::CallContext<test::TestInterface::BazParams,
test::TestInterface::BazResults> context) override {
++callCount;
auto params = context.getParams();
checkTestMessage(params.getS());
context.releaseParams();
EXPECT_ANY_THROW(context.getParams());
return kj::READY_NOW;
}
};
TEST(Capability, Basic) {
kj::SimpleEventLoop loop;
......@@ -117,33 +88,6 @@ TEST(Capability, Basic) {
EXPECT_TRUE(barFailed);
}
class TestExtendsImpl final: public test::TestExtends::Server {
public:
TestExtendsImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override {
++callCount;
EXPECT_EQ(321, params.getI());
EXPECT_FALSE(params.getJ());
result.setX("bar");
return kj::READY_NOW;
}
::kj::Promise<void> graultAdvanced(
::capnp::CallContext<test::TestExtends::GraultParams, test::TestAllTypes> context) override {
++callCount;
context.releaseParams();
initTestMessage(context.getResults());
return kj::READY_NOW;
}
};
TEST(Capability, Inheritance) {
kj::SimpleEventLoop loop;
......@@ -172,38 +116,6 @@ TEST(Capability, Inheritance) {
EXPECT_EQ(2, callCount);
}
class TestPipelineImpl final: public test::TestPipeline::Server {
public:
TestPipelineImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> getCapAdvanced(
capnp::CallContext<test::TestPipeline::GetCapParams,
test::TestPipeline::GetCapResults> context) override {
++callCount;
auto params = context.getParams();
EXPECT_EQ(234, params.getN());
auto cap = params.getInCap();
context.releaseParams();
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
return request.send().then(
[this,context](capnp::Response<test::TestInterface::FooResults>&& response) mutable {
EXPECT_EQ("foo", response.getX());
auto result = context.getResults();
result.setS("bar");
result.initOutBox().setCap(kj::heap<TestExtendsImpl>(callCount));
});
}
};
TEST(Capability, Pipelining) {
kj::SimpleEventLoop loop;
......
......@@ -36,6 +36,7 @@ case "$INPUT" in
*capnp/c++.capnp | \
*capnp/schema.capnp | \
*capnp/rpc.capnp | \
*capnp/rpc-twoparty.capnp | \
*capnp/compiler/lexer.capnp | \
*capnp/compiler/grammar.capnp )
exit 0
......
......@@ -25,6 +25,7 @@
#include "test-util.h"
#include <kj/debug.h>
#include <gtest/gtest.h>
#include <capnp/rpc.capnp.h>
#include <map>
#include <queue>
......@@ -55,8 +56,8 @@ private:
};
typedef VatNetwork<
test::TestSturdyRef, test::TestProvisionId, test::TestRecipientId, test::TestThirdPartyCapId,
test::TestJoinAnswer> TestNetworkAdapterBase;
test::TestSturdyRefHostId, test::TestProvisionId, test::TestRecipientId,
test::TestThirdPartyCapId, test::TestJoinAnswer> TestNetworkAdapterBase;
class TestNetworkAdapter final: public TestNetworkAdapterBase {
public:
......@@ -153,8 +154,9 @@ public:
kj::MutexGuarded<Queues> queues;
};
kj::Own<Connection> connectToHostOf(test::TestSturdyRef::Reader ref) override {
const TestNetworkAdapter& dst = KJ_REQUIRE_NONNULL(network.find(ref.getHost()));
kj::Maybe<kj::Own<Connection>> connectToRefHost(
test::TestSturdyRefHostId::Reader hostId) override {
const TestNetworkAdapter& dst = KJ_REQUIRE_NONNULL(network.find(hostId.getHost()));
kj::Locked<State> myLock;
kj::Locked<State> dstLock;
......@@ -183,9 +185,9 @@ public:
dstLock->fulfillerQueue.pop();
}
return kj::mv(local);
return kj::Own<Connection>(kj::mv(local));
} else {
return kj::addRef(*iter->second);
return kj::Own<Connection>(kj::addRef(*iter->second));
}
}
......@@ -222,103 +224,17 @@ TestNetworkAdapter& TestNetwork::add(kj::StringPtr name) {
// =======================================================================================
class TestInterfaceImpl final: public test::TestInterface::Server {
public:
TestInterfaceImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override {
++callCount;
EXPECT_EQ(123, params.getI());
EXPECT_TRUE(params.getJ());
result.setX("foo");
return kj::READY_NOW;
}
::kj::Promise<void> bazAdvanced(
::capnp::CallContext<test::TestInterface::BazParams,
test::TestInterface::BazResults> context) override {
++callCount;
auto params = context.getParams();
checkTestMessage(params.getS());
context.releaseParams();
EXPECT_ANY_THROW(context.getParams());
return kj::READY_NOW;
}
};
class TestExtendsImpl final: public test::TestExtends::Server {
public:
TestExtendsImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override {
++callCount;
EXPECT_EQ(321, params.getI());
EXPECT_FALSE(params.getJ());
result.setX("bar");
return kj::READY_NOW;
}
::kj::Promise<void> graultAdvanced(
::capnp::CallContext<test::TestExtends::GraultParams, test::TestAllTypes> context) override {
++callCount;
context.releaseParams();
initTestMessage(context.getResults());
return kj::READY_NOW;
}
};
class TestPipelineImpl final: public test::TestPipeline::Server {
public:
TestPipelineImpl(int& callCount): callCount(callCount) {}
int& callCount;
::kj::Promise<void> getCapAdvanced(
capnp::CallContext<test::TestPipeline::GetCapParams,
test::TestPipeline::GetCapResults> context) override {
++callCount;
auto params = context.getParams();
EXPECT_EQ(234, params.getN());
auto cap = params.getInCap();
context.releaseParams();
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
return request.send().then(
[this,context](capnp::Response<test::TestInterface::FooResults>&& response) mutable {
EXPECT_EQ("foo", response.getX());
auto result = context.getResults();
result.setS("bar");
result.initOutBox().setCap(kj::heap<TestExtendsImpl>(callCount));
});
}
};
class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRef> {
class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
public:
int callCount = 0;
Capability::Client restore(test::TestSturdyRef::Reader ref) override {
switch (ref.getTag()) {
case test::TestSturdyRef::Tag::TEST_INTERFACE:
Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
switch (objectId.getTag()) {
case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
return kj::heap<TestInterfaceImpl>(callCount);
case test::TestSturdyRef::Tag::TEST_PIPELINE:
case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
return Capability::Client(newBrokenCap("No TestExtends implemented."));
case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
return kj::heap<TestPipelineImpl>(callCount);
}
KJ_UNREACHABLE;
......@@ -330,16 +246,17 @@ protected:
TestNetwork network;
TestRestorer restorer;
kj::SimpleEventLoop loop;
RpcSystem<test::TestSturdyRef> rpcClient;
RpcSystem<test::TestSturdyRef> rpcServer;
RpcSystem<test::TestSturdyRefHostId> rpcClient;
RpcSystem<test::TestSturdyRefHostId> rpcServer;
Capability::Client connect(test::TestSturdyRef::Tag tag) {
Capability::Client connect(test::TestSturdyRefObjectId::Tag tag) {
MallocMessageBuilder refMessage(128);
auto ref = refMessage.initRoot<test::TestSturdyRef>();
ref.setHost("server");
ref.setTag(tag);
auto ref = refMessage.initRoot<rpc::SturdyRef>();
auto hostId = ref.getHostId().initAs<test::TestSturdyRefHostId>();
hostId.setHost("server");
ref.getObjectId().initAs<test::TestSturdyRefObjectId>().setTag(tag);
return rpcClient.connect(ref);
return rpcClient.connect(hostId, ref.getObjectId());
}
RpcTest()
......@@ -352,7 +269,8 @@ protected:
};
TEST_F(RpcTest, Basic) {
auto client = connect(test::TestSturdyRef::Tag::TEST_INTERFACE).castAs<test::TestInterface>();
auto client = connect(test::TestSturdyRefObjectId::Tag::TEST_INTERFACE)
.castAs<test::TestInterface>();
auto request1 = client.fooRequest();
request1.setI(123);
......@@ -387,7 +305,8 @@ TEST_F(RpcTest, Basic) {
}
TEST_F(RpcTest, Pipelining) {
auto client = connect(test::TestSturdyRef::Tag::TEST_PIPELINE).castAs<test::TestPipeline>();
auto client = connect(test::TestSturdyRefObjectId::Tag::TEST_PIPELINE)
.castAs<test::TestPipeline>();
int chainedCallCount = 0;
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "rpc-twoparty.h"
#include "test-util.h"
#include <kj/async-unix.h>
#include <kj/debug.h>
#include <kj/thread.h>
#include <gtest/gtest.h>
namespace capnp {
namespace _ {
namespace {
class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
public:
TestRestorer(int& callCount): callCount(callCount) {}
Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
switch (objectId.getTag()) {
case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
return kj::heap<TestInterfaceImpl>(callCount);
case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
return Capability::Client(newBrokenCap("No TestExtends implemented."));
case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
return kj::heap<TestPipelineImpl>(callCount);
}
KJ_UNREACHABLE;
}
private:
int& callCount;
};
void runServer(kj::Promise<void> quit, kj::Own<kj::AsyncIoStream> stream, int& callCount) {
kj::UnixEventLoop eventLoop;
TwoPartyVatNetwork network(eventLoop, *stream, rpc::twoparty::Side::SERVER);
TestRestorer restorer(callCount);
auto server = makeRpcServer(network, restorer, eventLoop);
eventLoop.wait(kj::mv(quit));
}
Capability::Client connect(RpcSystem<rpc::twoparty::SturdyRefHostId>& client,
rpc::twoparty::Side side,
test::TestSturdyRefObjectId::Tag tag) {
MallocMessageBuilder hostIdMessage(8);
MallocMessageBuilder objectIdMessage(8);
auto hostId = hostIdMessage.initRoot<rpc::twoparty::SturdyRefHostId>();
hostId.setSide(side);
objectIdMessage.initRoot<test::TestSturdyRefObjectId>().setTag(tag);
return client.connect(hostId, objectIdMessage.getRoot<ObjectPointer>());
}
TEST(TwoPartyNetwork, Basic) {
auto quitter = kj::newPromiseAndFulfiller<void>();
auto pipe = kj::newTwoWayPipe();
int callCount = 0;
kj::Thread thread([&]() {
runServer(kj::mv(quitter.promise), kj::mv(pipe.ends[1]), callCount);
});
KJ_DEFER(quitter.fulfiller->fulfill());
kj::UnixEventLoop loop;
TwoPartyVatNetwork network(loop, *pipe.ends[0], rpc::twoparty::Side::CLIENT);
auto rpcClient = makeRpcClient(network, loop);
auto client = connect(rpcClient, rpc::twoparty::Side::SERVER,
test::TestSturdyRefObjectId::Tag::TEST_INTERFACE).castAs<test::TestInterface>();
auto request1 = client.fooRequest();
request1.setI(123);
request1.setJ(true);
auto promise1 = request1.send();
auto request2 = client.bazRequest();
initTestMessage(request2.initS());
auto promise2 = request2.send();
bool barFailed = false;
auto request3 = client.barRequest();
auto promise3 = loop.there(request3.send(),
[](Response<test::TestInterface::BarResults>&& response) {
ADD_FAILURE() << "Expected bar() call to fail.";
}, [&](kj::Exception&& e) {
barFailed = true;
});
EXPECT_EQ(0, callCount);
auto response1 = loop.wait(kj::mv(promise1));
EXPECT_EQ("foo", response1.getX());
auto response2 = loop.wait(kj::mv(promise2));
loop.wait(kj::mv(promise3));
EXPECT_EQ(2, callCount);
EXPECT_TRUE(barFailed);
}
} // namespace
} // namespace _
} // namespace capnp
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "rpc-twoparty.h"
#include "serialize-async.h"
#include <kj/debug.h>
namespace capnp {
TwoPartyVatNetwork::TwoPartyVatNetwork(
const kj::EventLoop& eventLoop, kj::AsyncIoStream& stream, rpc::twoparty::Side side,
ReaderOptions receiveOptions)
: eventLoop(eventLoop), stream(stream), side(side), receiveOptions(receiveOptions),
previousWrite(kj::READY_NOW) {}
kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> TwoPartyVatNetwork::connectToRefHost(
rpc::twoparty::SturdyRefHostId::Reader ref) {
if (ref.getSide() == side) {
return nullptr;
} else {
return kj::Own<TwoPartyVatNetworkBase::Connection>(this,
kj::DestructorOnlyDisposer<TwoPartyVatNetworkBase::Connection>::instance);
}
}
kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>>
TwoPartyVatNetwork::acceptConnectionAsRefHost() {
if (side == rpc::twoparty::Side::SERVER && !accepted) {
accepted = true;
return kj::Own<TwoPartyVatNetworkBase::Connection>(this,
kj::DestructorOnlyDisposer<TwoPartyVatNetworkBase::Connection>::instance);
} else {
// Create a promise that will never be fulfilled.
auto paf = kj::newPromiseAndFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>();
acceptFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
}
class TwoPartyVatNetwork::OutgoingMessageImpl final
: public OutgoingRpcMessage, public kj::Refcounted {
public:
OutgoingMessageImpl(const TwoPartyVatNetwork& network, uint firstSegmentWordSize)
: network(network),
message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS : firstSegmentWordSize) {}
ObjectPointer::Builder getBody() override {
return message.getRoot<ObjectPointer>();
}
void send() override {
auto lock = network.previousWrite.lockExclusive();
*lock = network.eventLoop.there(network.eventLoop.there(kj::mv(*lock), [this]() {
return writeMessage(network.stream, message);
}), kj::mvCapture(kj::addRef(*this),
[](kj::Own<OutgoingMessageImpl>&& self) -> kj::Promise<void> {
// Hack to force this continuation to run (thus allowing `self` to be released) even if
// no one is waiting on the promise.
return kj::READY_NOW;
}));
}
private:
const TwoPartyVatNetwork& network;
MallocMessageBuilder message;
};
class TwoPartyVatNetwork::IncomingMessageImpl final: public IncomingRpcMessage {
public:
IncomingMessageImpl(kj::Own<MessageReader> message): message(kj::mv(message)) {}
ObjectPointer::Reader getBody() override {
return message->getRoot<ObjectPointer>();
}
private:
kj::Own<MessageReader> message;
};
kj::Own<OutgoingRpcMessage> TwoPartyVatNetwork::newOutgoingMessage(
uint firstSegmentWordSize) const {
return kj::refcounted<OutgoingMessageImpl>(*this, firstSegmentWordSize);
}
kj::Promise<kj::Own<IncomingRpcMessage>> TwoPartyVatNetwork::receiveIncomingMessage() {
return eventLoop.evalLater([&]() {
return readMessage(stream, receiveOptions)
.then([](kj::Own<MessageReader>&& message) -> kj::Own<IncomingRpcMessage> {
return kj::heap<IncomingMessageImpl>(kj::mv(message));
});
});
}
void TwoPartyVatNetwork::introduceTo(Connection& recipient,
rpc::twoparty::ThirdPartyCapId::Builder sendToRecipient,
rpc::twoparty::RecipientId::Builder sendToTarget) {
}
TwoPartyVatNetworkBase::ConnectionAndProvisionId TwoPartyVatNetwork::connectToIntroduced(
rpc::twoparty::ThirdPartyCapId::Reader capId) {
KJ_FAIL_REQUIRE("Three-party introductions should never occur on two-party network.");
}
kj::Own<TwoPartyVatNetworkBase::Connection> TwoPartyVatNetwork::acceptIntroducedConnection(
rpc::twoparty::RecipientId::Reader recipientId) {
KJ_FAIL_REQUIRE("Three-party introductions should never occur on two-party network.");
}
} // namespace capnp
......@@ -23,52 +23,77 @@
@0xa184c7885cdaf2a1;
# This file defines the "network-specific parameters" in rpc.proto to support a network consisting
# of two vats: a container, and the vat it contains. The container may actually be a full sandbox
# that confines the inner app. The contained app can only speak to the outside world through the
# container. Therefore, from the point of view of the containee, the container represents the
# whole world and all capabilities in the rest of the world appear "hosted" by the container.
# Meanwhile, the container proxies any capabilities exported by the containee, and thus the rest of
# the world only sees the container, and treats it as if it were itself the host of all of the
# containee's objects.
# of two vats. Each of these vats may in fact be in communication with other vats, but any
# capabilities they forward must be proxied. Thus, to each end of the connection, all capabilities
# received from the other end appear to live in a single vat.
#
# Two notable use cases for this model include:
# - Regular client-server communications, where a remote client machine (perhaps living on an end
# user's personal device) connects to a server. The server may be part of a cluster, and may
# call on other servers in the cluster to help service the user's request. It may even obtain
# capabilities from these other servers which it passes on to the user. To simplify network
# common traversal problems (e.g. if the user is behind a firewall), it is probably desirable to
# multiplex all communications between the server cluster and the client over the original
# connection rather than form new ones. This connection should use the two-party protocol, as
# the client has no interest in knowing about additional servers.
# - Applications running in a sandbox. A supervisor process may execute a confined application
# such that all of the confined app's communications with the outside world must pass through
# the supervisor. In this case, the connection between the confined app and the supervisor might
# as well use the two-party protocol, because the confined app is intentionally prevented from
# talking to any other vat anyway. Any external resources will be proxied through the supervisor,
# and so to the contained app will appear as if they were hosted by the supervisor itself.
#
# Since there are only two vats in this network, there is never a need for three-way introductions,
# so level 3 is free. Joins _could_ be needed in cases where the container itself participates in
# a network that uses joins, as the container may export two objects to the containee which,
# unbeknownst to it, are actually proxies of the same remote object. However, from the containee's
# point of view, such a join is trivial to request, and the containee never needs to receive join
# requests.
# so level 3 is free. Moreover, because it is never necessary to form new connections, the
# two-party protocol can be used easily anywhere where a two-way byte stream exists, without regard
# to where that byte stream goes or how it was initiated. This makes the two-party runtime library
# highly reusable.
#
# Joins (level 4) _could_ be needed in cases where one or both vats are participating in other
# networks that use joins. For instance, if Alice and Bob are speaking through the two-party
# protocol, and Bob is also participating on another network, Bob may send Alice two or more
# proxied capabilities which, unbeknownst to Bob at the time, are in fact pointing at the same
# remote object. Alice may then request to join these capabilities, at which point Bob will have
# to forward the join to the other network. Note, however, that if Alice is _not_ participating on
# any other network, then Alice will never need to _receive_ a Join, because Alice would always
# know when two locally-hosted capabilities are the same and would never export a redundant alias
# to Bob. So, Alice can respond to all incoming joins with an error, and only needs to implement
# outgoing joins if she herself desires to use this feature. Also, outgoing joins are relatively
# easy to implement in this scenario.
#
# Therefore, a level 4 implementation of the confined network is barely more complicated than a
# level 1 implementation. However, such an implementation is able to make use of the container's
# implementation of whatever network it lives in. Thus, if you have an application that implements
# the confined network at level 4, your application can participate in _any_ network at level 4 so
# long as you pair it with the appropriate container.
# What all this means is that a level 4 implementation of the confined network is barely more
# complicated than a level 2 implementation. However, such an implementation allows the "client"
# or "confined" app to access the server's/supervisor's network with equal functionality to any
# native participant. In other words, an application which implements only the two-party protocol
# can be paired with a proxy app in order to participate in any network.
#
# The "confined" network protocol may also be a reasonable basis for simple two-party client-server
# interactions, where the server plays the part of the container.
# So, when implementing Cap'n Proto in a new language, it makes sense to implement only the
# two-party protocol initially, and then pair applications with an appropriate proxy written in
# C++, rather than implement other parameterizations of the RPC protocol directly.
using Cxx = import "c++.capnp";
$Cxx.namespace("capnp::rpc::confined");
$Cxx.namespace("capnp::rpc::twoparty");
struct SturdyRef {
union {
external @0 :Object;
# The object lives outside the container. The container can handle `Restore` requests for this
# ref. The content of the ref is defined by the container implementation and configuration.
# It may simply be a SturdyRef in the format of the container's external network. However,
# some containers may actually encrypt external references in order to support distributed
# confinement:
enum Side {
server @0;
# The object lives on the "server" or "supervisor" end of the connection. Only the
# server/supervisor knows how to interpret the ref; to the client, it is opaque.
#
# Note that containers intending to implement strong confinement should rewrite SturdyRefs
# received from the external network before passing them on to the confined app. The confined
# app thus does not ever receive the raw bits of the SturdyRef (which it could perhaps
# maliciously leak), but instead receives only a thing that it can pass back to the container
# later to restore the ref. See:
# http://www.erights.org/elib/capability/dist-confine.html
confined @1 :Object;
# The object lives inside the container -- it is implemented by the contained app. That app
# handles `Restore` requests for this ref. The content of the ref is defined by the app
# and opaque to the container. The container may offer the ability to encrypt outgoing
# `SturdyRefs` so that their content is opaque to the outside world; in this case, so long as
# the app trusts the container, it need not encrypt nor sign the ref itself. However, this
# may be undesirable for level 1 applications that only export non-secret singletons anyway,
# so it should be configurable.
}
client @1;
# The object lives on the "client" or "confined app" end of the connection. Only the client
# knows how to interpret the ref; to the server/supervisor, it is opaque. Most clients do not
# actually know how to persist capabilities at all, so use of this is unusual.
}
struct SturdyRefHostId {
side @0 :Side;
}
struct ProvisionId {
......
This diff is collapsed.
This diff is collapsed.
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNP_RPC_TWOPARTY_H_
#define CAPNP_RPC_TWOPARTY_H_
#include "rpc.h"
#include "message.h"
#include <kj/async-io.h>
#include <capnp/rpc-twoparty.capnp.h>
namespace capnp {
typedef VatNetwork<rpc::twoparty::SturdyRefHostId, rpc::twoparty::ProvisionId,
rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinAnswer>
TwoPartyVatNetworkBase;
class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
private TwoPartyVatNetworkBase::Connection {
public:
TwoPartyVatNetwork(const kj::EventLoop& eventLoop, kj::AsyncIoStream& stream,
rpc::twoparty::Side side, ReaderOptions receiveOptions = ReaderOptions());
// implements VatNetwork -----------------------------------------------------
kj::Maybe<kj::Own<Connection>> connectToRefHost(
rpc::twoparty::SturdyRefHostId::Reader ref) override;
kj::Promise<kj::Own<Connection>> acceptConnectionAsRefHost() override;
private:
class OutgoingMessageImpl;
class IncomingMessageImpl;
const kj::EventLoop& eventLoop;
kj::AsyncIoStream& stream;
rpc::twoparty::Side side;
ReaderOptions receiveOptions;
bool accepted = false;
kj::MutexGuarded<kj::Promise<void>> previousWrite;
// Resolves when the previous write completes. This effectively serves as the write queue.
kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller;
// Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the
// second call on the server side. Never fulfilled, because there is only one connection.
// implements Connection -----------------------------------------------------
kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) const override;
kj::Promise<kj::Own<IncomingRpcMessage>> receiveIncomingMessage() override;
void introduceTo(Connection& recipient,
rpc::twoparty::ThirdPartyCapId::Builder sendToRecipient,
rpc::twoparty::RecipientId::Builder sendToTarget) override;
ConnectionAndProvisionId connectToIntroduced(
rpc::twoparty::ThirdPartyCapId::Reader capId) override;
kj::Own<Connection> acceptIntroducedConnection(
rpc::twoparty::RecipientId::Reader recipientId) override;
};
} // namespace capnp
#endif // CAPNP_RPC_TWOPARTY_H_
......@@ -280,7 +280,7 @@ public:
tasks.add(messageLoop());
}
kj::Own<const ClientHook> restore(_::StructReader ref) {
kj::Own<const ClientHook> restore(ObjectPointer::Reader objectId) {
QuestionId questionId;
auto paf = kj::newPromiseAndFulfiller<kj::Own<RpcResponse>>(eventLoop);
......@@ -296,11 +296,11 @@ public:
{
auto message = connection->newOutgoingMessage(
ref.totalSize() / WORDS + messageSizeHint<rpc::Restore>());
objectId.targetSizeInWords() + messageSizeHint<rpc::Restore>());
auto builder = message->getBody().initAs<rpc::Message>().initRestore();
builder.setQuestionId(questionId);
builder.getRef().setInternal(ref);
builder.getObjectId().set(objectId);
message->send();
}
......@@ -1852,7 +1852,7 @@ private:
// Call the restorer and initialize the answer.
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
KJ_IF_MAYBE(r, restorer) {
Capability::Client cap = r->baseRestore(restore.getRef());
Capability::Client cap = r->baseRestore(restore.getObjectId());
auto answer = context.imbue(ret.initAnswer());
answer.setAs<Capability>(cap);
capHook = answer.asReader().getPipelinedCap(nullptr);
......@@ -1919,11 +1919,17 @@ public:
}
}
Capability::Client connect(_::StructReader ref) {
auto connection = network.baseConnectToHostOf(ref);
Capability::Client connect(_::StructReader hostId, ObjectPointer::Reader objectId) {
KJ_IF_MAYBE(connection, network.baseConnectToRefHost(hostId)) {
auto lock = connections.lockExclusive();
auto& state = getConnectionState(kj::mv(connection), *lock);
return Capability::Client(state.restore(ref));
auto& state = getConnectionState(kj::mv(*connection), *lock);
return Capability::Client(state.restore(objectId));
} else KJ_IF_MAYBE(r, restorer) {
return r->baseRestore(objectId);
} else {
return Capability::Client(newBrokenCap(
"SturdyRef referred to a local object but there is no local SturdyRef restorer."));
}
}
void taskFailed(kj::Exception&& exception) override {
......@@ -1979,8 +1985,9 @@ RpcSystemBase::RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestore
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) = default;
RpcSystemBase::~RpcSystemBase() noexcept(false) {}
Capability::Client RpcSystemBase::baseConnect(_::StructReader ref) {
return impl->connect(ref);
Capability::Client RpcSystemBase::baseConnect(
_::StructReader hostId, ObjectPointer::Reader objectId) {
return impl->connect(hostId, objectId);
}
} // namespace _ (private)
......
......@@ -93,10 +93,10 @@
# the "Network-specific Parameters" section below. An implementation might have different levels
# depending on the network used.
#
# New implementations of Cap'n Proto should start out targeting the simplistic "confined" network
# type as defined in `rpc-confined.capnp`. With this network type, level 3 is irrelevant and
# levels 2 and 4 are much easier than usual to implement. When such an implementation is actually
# run inside a container, the contained app effectively gets to make full use of the container's
# New implementations of Cap'n Proto should start out targeting the simplistic two-party network
# type as defined in `rpc-twoparty.capnp`. With this network type, level 3 is irrelevant and
# levels 2 and 4 are much easier than usual to implement. When such an implementation is paired
# with a container proxy, the contained app effectively gets to make full use of the proxy's
# network at level 4. And since Cap'n Proto IPC is extremely fast, it may never make sense to
# bother implementing any other vat network protocol -- just use the correct container type and get
# it for free.
......@@ -465,7 +465,7 @@ struct Restore {
# A new question ID identifying this request, which will eventually receive a Return message
# containing the restored capability.
ref @1 :SturdyRef;
objectId @1 :SturdyRefObjectId;
# Designates the capability to restore.
}
......@@ -492,7 +492,7 @@ struct Delete {
# A new question ID identifying this request, which will eventually receive a Return message
# with an empty answer.
ref @1 :SturdyRef;
objectId @1 :SturdyRefObjectId;
# Designates the capability to delete.
}
......@@ -716,6 +716,20 @@ struct PromisedAnswer {
}
}
struct SturdyRef {
# **(level 2)**
#
# A combination of a SturdyRefObjectId and SturdyRefHostId. This is what a client of the ref
# would typically save in its own storage. This type is also the answer to a `Save` message.
hostId @0 :SturdyRefHostId;
# Describes how to connect to and authenticate a vat that hosts this SturdyRef (and can therefore
# accept a `Restore` message for it).
objectId @1 :SturdyRefObjectId;
# The opaque ref in the scope of the host vat, to be sent in the `Restore` message.
}
struct ThirdPartyCapDescriptor {
# **(level 3)**
#
......@@ -781,15 +795,18 @@ struct Exception {
# particular set of bindings for these types is defined elsewhere. (TODO(soon): Specify where
# these common definitions live.)
#
# Another common network type is the "confined" network, in which a contained vat interacts with
# the outside world entirely through a container/supervisor. All objects in the world that aren't
# hosted by the contained vat appear as if they were hosted by the container. This network type is
# interesting because from the containee's point of view, there are no three-party interactions at
# all, and joins are unusually simple to implement, so implementing at level 4 is barely more
# complicated than implementing at level 1. Moreover, if you pair an app implementing the confined
# network with a container that implements some other network, the app can then participate on
# the container's network just as if it implemented that network directly. The types used by the
# "confined" network are defined in `rpc-confined.capnp`.
# Another common network type is the two-party network, in which one of the parties typically
# interacts with the outside world entirely through the other party. In such a connection between
# Alice and Bob, all objects that exist on Bob's other networks appear to Alice as if they were
# hosted by Bob himself, and similarly all objects on Alice's network (if she even has one) appear
# to Bob as if they were hosted by Alice. This network type is interesting because from the point
# of view of a simple application that communicates with only one other party via the two-party
# protocol, there are no three-party interactions at all, and joins are unusually simple to
# implement, so implementing at level 4 is barely more complicated than implementing at level 1.
# Moreover, if you pair an app implementing the two-party network with a container that implements
# some other network, the app can then participate on the container's network just as if it
# implemented that network directly. The types used by the two-party network are defined in
# `rpc-twoparty.capnp`.
#
# The things which we need to parameterize are:
# - How to store capabilities long-term without holding a connection open (mostly level 2).
......@@ -831,36 +848,31 @@ struct Exception {
# between the joiner and the host of the joined object, and this connection must be authenticated.
# Thus, the details are network-dependent.
using SturdyRef = Object;
using SturdyRefHostId = Object;
# **(level 2)**
#
# Identifies the host of a persistent capability which can be restored using a `Restore` message.
# That is, this identifies where the `Restore` message should be sent, but does not provide any
# part of the `Restore` message's content. `SturdyRefHostId` is usually paired with a
# `SturdyRefObjectId`, often in the form of a `SturdyRef`.
#
# `SturdyRefHostId` could be as simple as a network address and public key fingerprint. Or, it
# might be more complicated or abstract. For example, on some kinds of networks, `SturdyRefHostId`
# might be an abstract service name without any information on where that service is physically
# located; the network itself might provide a separate service for mapping such names to locations.
# It could even be the case that a particular service name maps to a group of vats, where any vat
# in the group is able to restore the ref. Such an approach would make `SturdyRefHostId`s more
# robust against changes in network topology.
using SturdyRefObjectId = Object;
# **(mostly level 2)**
#
# Identifies a long-lived capability which can be obtained again in a future connection by sending
# a `Restore` message. A SturdyRef is a lot like a URL, but possibly with additional
# considerations e.g. to support authentication without a certificate authority.
#
# The base RPC protocol does not specify under what conditions a SturdyRef can
# be restored. For example:
# - Do you have to connect to a specific vat to restore the reference?
# - Is just any vat allowed to restore the SturdyRef, or is it tied to a specific vat requiring
# some form of authentication?
#
# At the very least, a SturdyRef must contain at least enough information to determine where to
# connect to restore the ref. Ideally, this information is not a physical machine address, but a
# logical identifier that can be passed to some lookup service to locate an appropriate vat. Using
# a physical machine address would make the network brittle -- a change in topology could
# invalidate all SturdyRefs.
#
# The ref should also contain some kind of signature or certificate which can be used to
# authenticate the vat, to protect against a malicious lookup service without the need for a
# centralized certificate authority.
#
# For example, a simple internet-friendly SturdyRef might contain a DNS host name, a public key
# fingerprint, and a Swiss number (large, unguessable random number;
# http://wiki.erights.org/wiki/Swiss_number) to identify the specific object within that vat.
# This construction does have the disadvantage, though, that a compromised private key could
# invalidate all existing refs that share that key, and a compromise of any one client's storage
# could require revoking all existing refs to that object. Various more-sophisticated mechanisms
# can solve these problems but these are beyond the scope of this protocol.
# A SturdyRefObjectId identifies a persistent object which may be restored later, within the scope
# of some host. The contents of a SturdyRefObjectId are entirely determined by the vat that hosts
# it. In fact, different vats on the same network may actually use different definitions for
# SturdyRefObjectId, so SturdyRefObjectId is not actually parameterized per-network but rather
# per-vat. A SturdyRefObjectId is typically paired with a `SturdyRefHostId` (in a
# `SturdyRef`) which describes how to find a vat capable of restoring the ref.
using ProvisionId = Object;
# **(level 3)**
......@@ -946,12 +958,12 @@ using JoinAnswer = Object;
#
# # Level 0 features -----------------------------------------------
#
# connectToHostOf(ref :SturdyRef) :Connection;
# # Connect to a host which can restore the given SturdyRef. The transport should return a
# # promise which does not resolve until authentication has completed, but allows messages to be
# # pipelined in before that; the transport either queues these messages until authenticated, or
# # sends them encrypted such that only the authentic vat would be able to decrypt them. The
# # latter approach avoids a round trip for authentication.
# connectToRefHost(hostId :SturdyRefHostId) :Connection;
# # Connect to the given SturdyRef host. The transport should return a promise which does not
# # resolve until authentication has completed, but allows messages to be pipelined in before
# # that; the transport either queues these messages until authenticated, or sends them encrypted
# # such that only the authentic vat would be able to decrypt them. The latter approach avoids a
# # round trip for authentication.
# #
# # Once connected, the caller should start by sending a `Restore` message.
#
......
......@@ -804,7 +804,7 @@ const ::capnp::_::RawSchema s_80bdec2a81f867ac = {
0x80bdec2a81f867ac, b_80bdec2a81f867ac.words, 46, d_80bdec2a81f867ac, m_80bdec2a81f867ac,
2, 2, i_80bdec2a81f867ac, nullptr, nullptr
};
static const ::capnp::_::AlignedData<45> b_ec0c922151b8b0a8 = {
static const ::capnp::_::AlignedData<46> b_ec0c922151b8b0a8 = {
{ 0, 0, 0, 0, 5, 0, 5, 0,
168, 176, 184, 81, 33, 146, 12, 236,
0, 0, 0, 0, 1, 0, 1, 0,
......@@ -831,10 +831,10 @@ static const ::capnp::_::AlignedData<45> b_ec0c922151b8b0a8 = {
1, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 1, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
45, 0, 0, 0, 34, 0, 0, 0,
45, 0, 0, 0, 74, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
40, 0, 0, 0, 2, 0, 1, 0,
48, 0, 0, 0, 2, 0, 1, 0,
44, 0, 0, 0, 2, 0, 1, 0,
52, 0, 0, 0, 2, 0, 1, 0,
113, 117, 101, 115, 116, 105, 111, 110,
73, 100, 0, 0, 0, 0, 0, 0,
8, 0, 0, 0, 0, 0, 0, 0,
......@@ -843,7 +843,8 @@ static const ::capnp::_::AlignedData<45> b_ec0c922151b8b0a8 = {
8, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
114, 101, 102, 0, 0, 0, 0, 0,
111, 98, 106, 101, 99, 116, 73, 100,
0, 0, 0, 0, 0, 0, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
......@@ -851,13 +852,13 @@ static const ::capnp::_::AlignedData<45> b_ec0c922151b8b0a8 = {
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, }
};
static const uint16_t m_ec0c922151b8b0a8[] = {0, 1};
static const uint16_t m_ec0c922151b8b0a8[] = {1, 0};
static const uint16_t i_ec0c922151b8b0a8[] = {0, 1};
const ::capnp::_::RawSchema s_ec0c922151b8b0a8 = {
0xec0c922151b8b0a8, b_ec0c922151b8b0a8.words, 45, nullptr, m_ec0c922151b8b0a8,
0xec0c922151b8b0a8, b_ec0c922151b8b0a8.words, 46, nullptr, m_ec0c922151b8b0a8,
0, 2, i_ec0c922151b8b0a8, nullptr, nullptr
};
static const ::capnp::_::AlignedData<45> b_86267432565dee97 = {
static const ::capnp::_::AlignedData<46> b_86267432565dee97 = {
{ 0, 0, 0, 0, 5, 0, 5, 0,
151, 238, 93, 86, 50, 116, 38, 134,
0, 0, 0, 0, 1, 0, 1, 0,
......@@ -884,10 +885,10 @@ static const ::capnp::_::AlignedData<45> b_86267432565dee97 = {
1, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 1, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
45, 0, 0, 0, 34, 0, 0, 0,
45, 0, 0, 0, 74, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
40, 0, 0, 0, 2, 0, 1, 0,
48, 0, 0, 0, 2, 0, 1, 0,
44, 0, 0, 0, 2, 0, 1, 0,
52, 0, 0, 0, 2, 0, 1, 0,
113, 117, 101, 115, 116, 105, 111, 110,
73, 100, 0, 0, 0, 0, 0, 0,
8, 0, 0, 0, 0, 0, 0, 0,
......@@ -896,7 +897,8 @@ static const ::capnp::_::AlignedData<45> b_86267432565dee97 = {
8, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
114, 101, 102, 0, 0, 0, 0, 0,
111, 98, 106, 101, 99, 116, 73, 100,
0, 0, 0, 0, 0, 0, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
......@@ -904,10 +906,10 @@ static const ::capnp::_::AlignedData<45> b_86267432565dee97 = {
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, }
};
static const uint16_t m_86267432565dee97[] = {0, 1};
static const uint16_t m_86267432565dee97[] = {1, 0};
static const uint16_t i_86267432565dee97[] = {0, 1};
const ::capnp::_::RawSchema s_86267432565dee97 = {
0x86267432565dee97, b_86267432565dee97.words, 45, nullptr, m_86267432565dee97,
0x86267432565dee97, b_86267432565dee97.words, 46, nullptr, m_86267432565dee97,
0, 2, i_86267432565dee97, nullptr, nullptr
};
static const ::capnp::_::AlignedData<54> b_9c6a046bfbc1ac5a = {
......@@ -1378,6 +1380,60 @@ const ::capnp::_::RawSchema s_f316944415569081 = {
0xf316944415569081, b_f316944415569081.words, 47, nullptr, m_f316944415569081,
0, 2, i_f316944415569081, nullptr, nullptr
};
static const ::capnp::_::AlignedData<46> b_ce8c7a90684b48ff = {
{ 0, 0, 0, 0, 5, 0, 5, 0,
255, 72, 75, 104, 144, 122, 140, 206,
0, 0, 0, 0, 1, 0, 0, 0,
80, 162, 82, 37, 27, 152, 18, 179,
2, 0, 7, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
17, 0, 0, 0, 210, 0, 0, 0,
29, 0, 0, 0, 7, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
25, 0, 0, 0, 119, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
99, 97, 112, 110, 112, 47, 114, 112,
99, 46, 99, 97, 112, 110, 112, 58,
83, 116, 117, 114, 100, 121, 82, 101,
102, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 1, 0, 1, 0,
8, 0, 0, 0, 3, 0, 4, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
41, 0, 0, 0, 58, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
36, 0, 0, 0, 2, 0, 1, 0,
44, 0, 0, 0, 2, 0, 1, 0,
1, 0, 0, 0, 1, 0, 0, 0,
0, 0, 1, 0, 1, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
41, 0, 0, 0, 74, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
40, 0, 0, 0, 2, 0, 1, 0,
48, 0, 0, 0, 2, 0, 1, 0,
104, 111, 115, 116, 73, 100, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
111, 98, 106, 101, 99, 116, 73, 100,
0, 0, 0, 0, 0, 0, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
18, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, }
};
static const uint16_t m_ce8c7a90684b48ff[] = {0, 1};
static const uint16_t i_ce8c7a90684b48ff[] = {0, 1};
const ::capnp::_::RawSchema s_ce8c7a90684b48ff = {
0xce8c7a90684b48ff, b_ce8c7a90684b48ff.words, 46, nullptr, m_ce8c7a90684b48ff,
0, 2, i_ce8c7a90684b48ff, nullptr, nullptr
};
static const ::capnp::_::AlignedData<46> b_d37007fde1f0027d = {
{ 0, 0, 0, 0, 5, 0, 5, 0,
125, 2, 240, 225, 253, 7, 112, 211,
......@@ -1586,6 +1642,8 @@ CAPNP_DEFINE_STRUCT(
::capnp::rpc::PromisedAnswer);
CAPNP_DEFINE_STRUCT(
::capnp::rpc::PromisedAnswer::Op);
CAPNP_DEFINE_STRUCT(
::capnp::rpc::SturdyRef);
CAPNP_DEFINE_STRUCT(
::capnp::rpc::ThirdPartyCapDescriptor);
CAPNP_DEFINE_STRUCT(
......
......@@ -211,6 +211,14 @@ struct PromisedAnswer::Op {
};
};
struct SturdyRef {
SturdyRef() = delete;
class Reader;
class Builder;
class Pipeline;
};
struct ThirdPartyCapDescriptor {
ThirdPartyCapDescriptor() = delete;
......@@ -259,6 +267,7 @@ extern const ::capnp::_::RawSchema s_fbe1980490e001af;
extern const ::capnp::_::RawSchema s_8523ddc40b86b8b0;
extern const ::capnp::_::RawSchema s_d800b1d6cd6f1ca0;
extern const ::capnp::_::RawSchema s_f316944415569081;
extern const ::capnp::_::RawSchema s_ce8c7a90684b48ff;
extern const ::capnp::_::RawSchema s_d37007fde1f0027d;
extern const ::capnp::_::RawSchema s_d625b7063acf691a;
extern const ::capnp::_::RawSchema s_bbaeda2607b6f958;
......@@ -320,6 +329,9 @@ CAPNP_DECLARE_STRUCT(
CAPNP_DECLARE_STRUCT(
::capnp::rpc::PromisedAnswer::Op, f316944415569081,
1, 0, FOUR_BYTES);
CAPNP_DECLARE_STRUCT(
::capnp::rpc::SturdyRef, ce8c7a90684b48ff,
0, 2, INLINE_COMPOSITE);
CAPNP_DECLARE_STRUCT(
::capnp::rpc::ThirdPartyCapDescriptor, d37007fde1f0027d,
1, 1, INLINE_COMPOSITE);
......@@ -1342,8 +1354,8 @@ public:
inline bool hasQuestionId() const;
inline ::uint32_t getQuestionId() const;
inline bool hasRef() const;
inline ::capnp::ObjectPointer::Reader getRef() const;
inline bool hasObjectId() const;
inline ::capnp::ObjectPointer::Reader getObjectId() const;
private:
::capnp::_::StructReader _reader;
......@@ -1379,9 +1391,9 @@ public:
inline ::uint32_t getQuestionId();
inline void setQuestionId( ::uint32_t value);
inline bool hasRef();
inline ::capnp::ObjectPointer::Builder getRef();
inline ::capnp::ObjectPointer::Builder initRef();
inline bool hasObjectId();
inline ::capnp::ObjectPointer::Builder getObjectId();
inline ::capnp::ObjectPointer::Builder initObjectId();
private:
::capnp::_::StructBuilder _builder;
......@@ -1423,8 +1435,8 @@ public:
inline bool hasQuestionId() const;
inline ::uint32_t getQuestionId() const;
inline bool hasRef() const;
inline ::capnp::ObjectPointer::Reader getRef() const;
inline bool hasObjectId() const;
inline ::capnp::ObjectPointer::Reader getObjectId() const;
private:
::capnp::_::StructReader _reader;
......@@ -1460,9 +1472,9 @@ public:
inline ::uint32_t getQuestionId();
inline void setQuestionId( ::uint32_t value);
inline bool hasRef();
inline ::capnp::ObjectPointer::Builder getRef();
inline ::capnp::ObjectPointer::Builder initRef();
inline bool hasObjectId();
inline ::capnp::ObjectPointer::Builder getObjectId();
inline ::capnp::ObjectPointer::Builder initObjectId();
private:
::capnp::_::StructBuilder _builder;
......@@ -2129,6 +2141,87 @@ private:
friend struct ::capnp::ToDynamic_;
};
class SturdyRef::Reader {
public:
typedef SturdyRef Reads;
Reader() = default;
inline explicit Reader(::capnp::_::StructReader base): _reader(base) {}
inline size_t totalSizeInWords() const {
return _reader.totalSize() / ::capnp::WORDS;
}
inline bool hasHostId() const;
inline ::capnp::ObjectPointer::Reader getHostId() const;
inline bool hasObjectId() const;
inline ::capnp::ObjectPointer::Reader getObjectId() const;
private:
::capnp::_::StructReader _reader;
template <typename T, ::capnp::Kind k>
friend struct ::capnp::ToDynamic_;
template <typename T, ::capnp::Kind k>
friend struct ::capnp::_::PointerHelpers;
template <typename T, ::capnp::Kind k>
friend struct ::capnp::List;
friend class ::capnp::MessageBuilder;
friend class ::capnp::Orphanage;
friend ::kj::StringTree KJ_STRINGIFY(SturdyRef::Reader reader);
};
inline ::kj::StringTree KJ_STRINGIFY(SturdyRef::Reader reader) {
return ::capnp::_::structString<SturdyRef>(reader._reader);
}
class SturdyRef::Builder {
public:
typedef SturdyRef Builds;
Builder() = delete; // Deleted to discourage incorrect usage.
// You can explicitly initialize to nullptr instead.
inline Builder(decltype(nullptr)) {}
inline explicit Builder(::capnp::_::StructBuilder base): _builder(base) {}
inline operator Reader() const { return Reader(_builder.asReader()); }
inline Reader asReader() const { return *this; }
inline size_t totalSizeInWords() { return asReader().totalSizeInWords(); }
inline bool hasHostId();
inline ::capnp::ObjectPointer::Builder getHostId();
inline ::capnp::ObjectPointer::Builder initHostId();
inline bool hasObjectId();
inline ::capnp::ObjectPointer::Builder getObjectId();
inline ::capnp::ObjectPointer::Builder initObjectId();
private:
::capnp::_::StructBuilder _builder;
template <typename T, ::capnp::Kind k>
friend struct ::capnp::ToDynamic_;
friend class ::capnp::Orphanage;
friend ::kj::StringTree KJ_STRINGIFY(SturdyRef::Builder builder);
};
inline ::kj::StringTree KJ_STRINGIFY(SturdyRef::Builder builder) {
return ::capnp::_::structString<SturdyRef>(builder._builder.asReader());
}
class SturdyRef::Pipeline {
public:
typedef SturdyRef Pipelines;
inline Pipeline(decltype(nullptr)): _typeless(nullptr) {}
inline explicit Pipeline(::capnp::ObjectPointer::Pipeline&& typeless)
: _typeless(kj::mv(typeless)) {}
private:
::capnp::ObjectPointer::Pipeline _typeless;
template <typename T, ::capnp::Kind k>
friend struct ::capnp::ToDynamic_;
};
class ThirdPartyCapDescriptor::Reader {
public:
typedef ThirdPartyCapDescriptor Reads;
......@@ -3837,21 +3930,21 @@ inline void Restore::Builder::setQuestionId( ::uint32_t value) {
0 * ::capnp::ELEMENTS, value);
}
inline bool Restore::Reader::hasRef() const {
inline bool Restore::Reader::hasObjectId() const {
return !_reader.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline bool Restore::Builder::hasRef() {
inline bool Restore::Builder::hasObjectId() {
return !_builder.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline ::capnp::ObjectPointer::Reader Restore::Reader::getRef() const {
inline ::capnp::ObjectPointer::Reader Restore::Reader::getObjectId() const {
return ::capnp::ObjectPointer::Reader(
_reader.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder Restore::Builder::getRef() {
inline ::capnp::ObjectPointer::Builder Restore::Builder::getObjectId() {
return ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder Restore::Builder::initRef() {
inline ::capnp::ObjectPointer::Builder Restore::Builder::initObjectId() {
auto result = ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
result.clear();
......@@ -3879,21 +3972,21 @@ inline void Delete::Builder::setQuestionId( ::uint32_t value) {
0 * ::capnp::ELEMENTS, value);
}
inline bool Delete::Reader::hasRef() const {
inline bool Delete::Reader::hasObjectId() const {
return !_reader.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline bool Delete::Builder::hasRef() {
inline bool Delete::Builder::hasObjectId() {
return !_builder.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline ::capnp::ObjectPointer::Reader Delete::Reader::getRef() const {
inline ::capnp::ObjectPointer::Reader Delete::Reader::getObjectId() const {
return ::capnp::ObjectPointer::Reader(
_reader.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder Delete::Builder::getRef() {
inline ::capnp::ObjectPointer::Builder Delete::Builder::getObjectId() {
return ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder Delete::Builder::initRef() {
inline ::capnp::ObjectPointer::Builder Delete::Builder::initObjectId() {
auto result = ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
result.clear();
......@@ -4512,6 +4605,48 @@ inline void PromisedAnswer::Op::Builder::setGetPointerField( ::uint16_t value) {
1 * ::capnp::ELEMENTS, value);
}
inline bool SturdyRef::Reader::hasHostId() const {
return !_reader.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline bool SturdyRef::Builder::hasHostId() {
return !_builder.getPointerField(0 * ::capnp::POINTERS).isNull();
}
inline ::capnp::ObjectPointer::Reader SturdyRef::Reader::getHostId() const {
return ::capnp::ObjectPointer::Reader(
_reader.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder SturdyRef::Builder::getHostId() {
return ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder SturdyRef::Builder::initHostId() {
auto result = ::capnp::ObjectPointer::Builder(
_builder.getPointerField(0 * ::capnp::POINTERS));
result.clear();
return result;
}
inline bool SturdyRef::Reader::hasObjectId() const {
return !_reader.getPointerField(1 * ::capnp::POINTERS).isNull();
}
inline bool SturdyRef::Builder::hasObjectId() {
return !_builder.getPointerField(1 * ::capnp::POINTERS).isNull();
}
inline ::capnp::ObjectPointer::Reader SturdyRef::Reader::getObjectId() const {
return ::capnp::ObjectPointer::Reader(
_reader.getPointerField(1 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder SturdyRef::Builder::getObjectId() {
return ::capnp::ObjectPointer::Builder(
_builder.getPointerField(1 * ::capnp::POINTERS));
}
inline ::capnp::ObjectPointer::Builder SturdyRef::Builder::initObjectId() {
auto result = ::capnp::ObjectPointer::Builder(
_builder.getPointerField(1 * ::capnp::POINTERS));
result.clear();
return result;
}
inline bool ThirdPartyCapDescriptor::Reader::hasId() const {
return !_reader.getPointerField(0 * ::capnp::POINTERS).isNull();
}
......
This diff is collapsed.
......@@ -868,5 +868,81 @@ void checkDynamicTestMessageAllZero(DynamicStruct::Reader reader) {
dynamicCheckTestMessageAllZero(reader);
}
// =======================================================================================
// Interface implementations.
TestInterfaceImpl::TestInterfaceImpl(int& callCount): callCount(callCount) {}
::kj::Promise<void> TestInterfaceImpl::foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) {
++callCount;
EXPECT_EQ(123, params.getI());
EXPECT_TRUE(params.getJ());
result.setX("foo");
return kj::READY_NOW;
}
::kj::Promise<void> TestInterfaceImpl::bazAdvanced(
::capnp::CallContext<test::TestInterface::BazParams,
test::TestInterface::BazResults> context) {
++callCount;
auto params = context.getParams();
checkTestMessage(params.getS());
context.releaseParams();
EXPECT_ANY_THROW(context.getParams());
return kj::READY_NOW;
}
TestExtendsImpl::TestExtendsImpl(int& callCount): callCount(callCount) {}
::kj::Promise<void> TestExtendsImpl::foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) {
++callCount;
EXPECT_EQ(321, params.getI());
EXPECT_FALSE(params.getJ());
result.setX("bar");
return kj::READY_NOW;
}
::kj::Promise<void> TestExtendsImpl::graultAdvanced(
::capnp::CallContext<test::TestExtends::GraultParams, test::TestAllTypes> context) {
++callCount;
context.releaseParams();
initTestMessage(context.getResults());
return kj::READY_NOW;
}
TestPipelineImpl::TestPipelineImpl(int& callCount): callCount(callCount) {}
::kj::Promise<void> TestPipelineImpl::getCapAdvanced(
capnp::CallContext<test::TestPipeline::GetCapParams,
test::TestPipeline::GetCapResults> context) {
++callCount;
auto params = context.getParams();
EXPECT_EQ(234, params.getN());
auto cap = params.getInCap();
context.releaseParams();
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
return request.send().then(
[this,context](capnp::Response<test::TestInterface::FooResults>&& response) mutable {
EXPECT_EQ("foo", response.getX());
auto result = context.getResults();
result.setS("bar");
result.initOutBox().setCap(kj::heap<TestExtendsImpl>(callCount));
});
}
} // namespace _ (private)
} // namespace capnp
......@@ -141,6 +141,52 @@ void checkList(T reader, std::initializer_list<ReaderFor<Element>> expected) {
#undef as
// =======================================================================================
// Interface implementations.
class TestInterfaceImpl final: public test::TestInterface::Server {
public:
TestInterfaceImpl(int& callCount);
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override;
::kj::Promise<void> bazAdvanced(
::capnp::CallContext<test::TestInterface::BazParams,
test::TestInterface::BazResults> context) override;
private:
int& callCount;
};
class TestExtendsImpl final: public test::TestExtends::Server {
public:
TestExtendsImpl(int& callCount);
::kj::Promise<void> foo(
test::TestInterface::FooParams::Reader params,
test::TestInterface::FooResults::Builder result) override;
::kj::Promise<void> graultAdvanced(
::capnp::CallContext<test::TestExtends::GraultParams, test::TestAllTypes> context) override;
private:
int& callCount;
};
class TestPipelineImpl final: public test::TestPipeline::Server {
public:
TestPipelineImpl(int& callCount);
::kj::Promise<void> getCapAdvanced(
capnp::CallContext<test::TestPipeline::GetCapParams,
test::TestPipeline::GetCapResults> context) override;
private:
int& callCount;
};
} // namespace _ (private)
} // namespace capnp
......
......@@ -594,12 +594,16 @@ interface TestPipeline {
}
}
struct TestSturdyRef {
struct TestSturdyRefHostId {
host @0 :Text;
tag @1 :Tag;
}
struct TestSturdyRefObjectId {
tag @0 :Tag;
enum Tag {
testInterface @0;
testPipeline @1;
testExtends @1;
testPipeline @2;
}
}
......
......@@ -108,5 +108,62 @@ TEST(AsyncIo, AddressParsing) {
EXPECT_EQ("unix:foo/bar/baz", tryParseRemote(loop, network, "unix:foo/bar/baz"));
}
TEST(AsyncIo, OneWayPipe) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler);
auto pipe = newOneWayPipe();
char receiveBuffer[4];
tasks.add(loop.evalLater([&]() {
return pipe.out->write("foo", 3);
}));
kj::String result = loop.wait(loop.evalLater([&]() {
return pipe.in->tryRead(receiveBuffer, 3, 4)
.then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
});
}));
EXPECT_EQ("foo", result);
}
TEST(AsyncIo, TwoWayPipe) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
auto pipe = newTwoWayPipe();
char receiveBuffer1[4];
char receiveBuffer2[4];
auto promise = loop.evalLater([&]() {
return pipe.ends[0]->write("foo", 3)
.then([&]() {
return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer1, n);
});
});
kj::String result = loop.wait(loop.evalLater([&]() {
return pipe.ends[1]->write("bar", 3)
.then([&]() {
return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer2, n);
});
}));
kj::String result2 = loop.wait(kj::mv(promise));
EXPECT_EQ("foo", result);
EXPECT_EQ("bar", result2);
}
} // namespace
} // namespace kj
......@@ -649,6 +649,26 @@ Own<AsyncIoStream> AsyncIoStream::wrapFd(int fd) {
return heap<AsyncStreamFd>(fd, fd);
}
OneWayPipe newOneWayPipe() {
int fds[2];
#if __linux__
KJ_SYSCALL(pipe2(fds, O_NONBLOCK | O_CLOEXEC));
#else
KJ_SYSCALL(pipe(fds));
#endif
return OneWayPipe { heap<Socket>(fds[0]), heap<Socket>(fds[1]) };
}
TwoWayPipe newTwoWayPipe() {
int fds[2];
int type = SOCK_STREAM;
#if __linux__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif
KJ_SYSCALL(socketpair(AF_UNIX, type, 0, fds));
return TwoWayPipe { { heap<Socket>(fds[0]), heap<Socket>(fds[1]) } };
}
OperatingSystem& getOperatingSystemSingleton() {
static UnixKernel os;
return os;
......
......@@ -60,7 +60,6 @@ class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
// A combination input and output stream.
public:
static Own<AsyncIoStream> wrapFd(int fd);
// Create an AsyncIoStream wrapping a file descriptor.
//
......@@ -151,6 +150,21 @@ public:
// virtual Directory& getRootDir() = 0;
};
struct OneWayPipe {
Own<AsyncInputStream> in;
Own<AsyncOutputStream> out;
};
OneWayPipe newOneWayPipe();
// Creates an input/output stream pair representing the ends of a one-way OS pipe (created with
// pipe(2)).
struct TwoWayPipe {
Own<AsyncIoStream> ends[2];
};
TwoWayPipe newTwoWayPipe();
// Creates two AsyncIoStreams representing the two ends of a two-way OS pipe (created with
// socketpair(2)). Data written to one end can be read from the other.
OperatingSystem& getOperatingSystemSingleton();
// Get the EVIL singleton instance of OperatingSystem representing the real kernel.
//
......
......@@ -228,20 +228,20 @@ public:
}
template <typename Func>
auto map(Func&& f) -> Maybe<decltype(f(instance<T&>()))> {
auto map(Func&& f) -> Maybe<decltype(f(instance<Own<T>&>()))> {
if (ptr == nullptr) {
return nullptr;
} else {
return f(*ptr);
return f(ptr);
}
}
template <typename Func>
auto map(Func&& f) const -> Maybe<decltype(f(instance<const T&>()))> {
auto map(Func&& f) const -> Maybe<decltype(f(instance<const Own<T>&>()))> {
if (ptr == nullptr) {
return nullptr;
} else {
return f(*ptr);
return f(ptr);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment