Commit 5f6baed9 authored by Kenton Varda's avatar Kenton Varda

First tests of RPC protocol implementation.

parent 9575df0c
......@@ -112,6 +112,10 @@ kj::Own<const ClientHook> newBrokenCap(const char* reason) {
return kj::refcounted<BrokenClient>(reason);
}
kj::Own<const ClientHook> newBrokenCap(kj::Exception&& reason) {
return kj::refcounted<BrokenClient>(kj::mv(reason));
}
Arena::~Arena() noexcept(false) {}
BuilderArena::~BuilderArena() noexcept(false) {}
......@@ -412,20 +416,26 @@ SegmentBuilder* ImbuedBuilderArena::imbue(SegmentBuilder* baseSegment) {
result = &segment0;
} else {
auto lock = moreSegments.lockExclusive();
KJ_IF_MAYBE(segmentState, *lock) {
auto id = baseSegment->getSegmentId().value;
if (id >= segmentState->get()->builders.size()) {
segmentState->get()->builders.resize(id + 1);
}
KJ_IF_MAYBE(segment, segmentState->get()->builders[id]) {
result = *segment;
} else {
auto newBuilder = kj::heap<ImbuedSegmentBuilder>(this, baseSegment);
result = newBuilder;
segmentState->get()->builders[id] = kj::mv(newBuilder);
}
MultiSegmentState* segmentState;
KJ_IF_MAYBE(s, *lock) {
segmentState = *s;
} else {
auto newState = kj::heap<MultiSegmentState>();
segmentState = newState;
*lock = kj::mv(newState);
}
auto id = baseSegment->getSegmentId().value;
if (id >= segmentState->builders.size()) {
segmentState->builders.resize(id + 1);
}
KJ_IF_MAYBE(segment, segmentState->builders[id]) {
result = *segment;
} else {
auto newBuilder = kj::heap<ImbuedSegmentBuilder>(this, baseSegment);
result = newBuilder;
segmentState->builders[id] = kj::mv(newBuilder);
}
return nullptr;
}
KJ_DASSERT(result->getArray().begin() == baseSegment->getArray().begin());
......
......@@ -32,6 +32,7 @@
#include <unordered_map>
#include <kj/common.h>
#include <kj/mutex.h>
#include <kj/exception.h>
#include "common.h"
#include "message.h"
#include "layout.h"
......@@ -58,6 +59,7 @@ class Segment;
typedef kj::Id<uint32_t, Segment> SegmentId;
kj::Own<const ClientHook> newBrokenCap(const char* reason);
kj::Own<const ClientHook> newBrokenCap(kj::Exception&& reason);
// Helper function that creates a capability which simply throws exceptions when called.
// Implemented in arena.c++ rather than capability.c++ because it is needed by layout.c++ and we
// don't want capability.c++ to be required by people not using caps.
......
......@@ -77,6 +77,10 @@ kj::Own<const ClientHook> newBrokenCap(const char* reason) {
return _::newBrokenCap(reason);
}
kj::Own<const ClientHook> newBrokenCap(kj::Exception&& reason) {
return _::newBrokenCap(kj::mv(reason));
}
// =======================================================================================
namespace {
......@@ -384,6 +388,10 @@ public:
CallContext<ObjectPointer, ObjectPointer>(*contextPtr));
});
// Make sure that this client cannot be destroyed until the promise completes.
promise = eventLoop.there(kj::mv(promise), kj::mvCapture(kj::addRef(*this),
[=](kj::Own<const LocalClient>&& ref) {}));
// We have to fork this promise for the pipeline to receive a copy of the answer.
auto forked = eventLoop.fork(kj::mv(promise));
......@@ -393,7 +401,7 @@ public:
return kj::refcounted<LocalPipeline>(kj::mv(context));
}));
auto completionPromise = eventLoop.there(forked.addBranch(), kj::mvCapture(context->addRef(),
auto completionPromise = eventLoop.there(forked.addBranch(), kj::mvCapture(context,
[=](kj::Own<CallContextHook>&& context) {
// Nothing to do here. We just wanted to make sure to hold on to a reference to the
// context even if the pipeline was discarded.
......
......@@ -208,6 +208,12 @@ struct ObjectPointer {
inline Reader asReader() const { return Reader(builder.asReader()); }
inline operator Reader() const { return Reader(builder.asReader()); }
inline void setInternal(_::StructReader value) { builder.setStruct(value); }
// For internal use.
//
// TODO(cleanup): RPC implementation uses this, but wouldn't have to if we had an AnyStruct
// type, which would be useful anyawy.
private:
_::PointerBuilder builder;
friend class Orphanage;
......@@ -254,9 +260,15 @@ public:
Orphan(Orphan&&) = default;
Orphan& operator=(Orphan&&) = default;
template <typename T>
inline Orphan(Orphan<T>&& other): builder(kj::mv(other.builder)) {}
template <typename T>
inline Orphan& operator=(Orphan<T>&& other) { builder = kj::mv(other.builder); }
// Cast from typed orphan.
// It's not possible to get an ObjectPointer::{Reader,Builder} directly since there is no
// underlying pointer (the pointer would normally live in the parent, but this object is
// orphaned). It is possible, however, to request readers/builders.
// orphaned). It is possible, however, to request typed readers/builders.
template <typename T>
inline BuilderFor<T> getAs();
......
This diff is collapsed.
This diff is collapsed.
......@@ -35,6 +35,8 @@ namespace capnp {
// ***************************************************************************************
// =======================================================================================
// TODO(cleanup): Put these in rpc-internal.h?
class OutgoingRpcMessage;
class IncomingRpcMessage;
......@@ -78,8 +80,9 @@ public:
class RpcSystemBase {
public:
RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer,
RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer,
const kj::EventLoop& eventLoop);
RpcSystemBase(RpcSystemBase&& other);
~RpcSystemBase() noexcept(false);
private:
......@@ -189,7 +192,7 @@ public:
// on the new connection will be an `Accept` message.
private:
void baseIntroduceTo(Connection& recipient,
void baseIntroduceTo(VatNetworkBase::Connection& recipient,
ObjectPointer::Builder sendToRecipient,
ObjectPointer::Builder sendToTarget) override final;
_::VatNetworkBase::ConnectionAndProvisionId baseConnectToIntroduced(
......@@ -232,7 +235,8 @@ class SturdyRefRestorer: public _::SturdyRefRestorerBase {
public:
virtual Capability::Client restore(typename SturdyRef::Reader ref) = 0;
// Restore the given SturdyRef, returning a capability representing it.
// Restore the given SturdyRef, returning a capability representing it. This is guaranteed only
// to be called on the RpcSystem's EventLoop's thread.
private:
Capability::Client baseRestore(ObjectPointer::Reader ref) override final;
......@@ -246,6 +250,7 @@ public:
RpcSystem(
VatNetwork<OutgoingSturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>& network,
kj::Maybe<SturdyRefRestorer<IncomingSturdyRef>&> restorer, const kj::EventLoop& eventLoop);
RpcSystem(RpcSystem&& other) = default;
Capability::Client connect(typename OutgoingSturdyRef::Reader ref);
// Restore the given SturdyRef from the network and return the capability representing it.
......@@ -255,7 +260,7 @@ template <typename OutgoingSturdyRef, typename IncomingSturdyRef,
typename ProvisionId, typename RecipientId, typename ThirdPartyCapId, typename JoinAnswer>
RpcSystem<OutgoingSturdyRef, IncomingSturdyRef> makeRpcServer(
VatNetwork<OutgoingSturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>& network,
kj::Maybe<SturdyRefRestorer<IncomingSturdyRef>&> restorer,
SturdyRefRestorer<IncomingSturdyRef>& restorer,
const kj::EventLoop& eventLoop = kj::EventLoop::current());
// Make an RPC server. Typical usage (e.g. in a main() function):
//
......@@ -270,7 +275,7 @@ template <typename OutgoingSturdyRef, typename ProvisionId,
RpcSystem<OutgoingSturdyRef> makeRpcClient(
VatNetwork<OutgoingSturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>& network,
const kj::EventLoop& eventLoop = kj::EventLoop::current());
// Make an RPC server. Typical usage (e.g. in a main() function):
// Make an RPC client. Typical usage (e.g. in a main() function):
//
// MyEventLoop eventLoop;
// MyNetwork network(eventLoop);
......@@ -289,10 +294,10 @@ RpcSystem<OutgoingSturdyRef> makeRpcClient(
template <typename SturdyRef, typename ProvisionId, typename RecipientId,
typename ThirdPartyCapId, typename JoinAnswer>
void VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>::
Connection::baseIntroduceTo(Connection& recipient,
Connection::baseIntroduceTo(VatNetworkBase::Connection& recipient,
ObjectPointer::Builder sendToRecipient,
ObjectPointer::Builder sendToTarget) {
introduceTo(recipient, sendToRecipient.initAs<ThirdPartyCapId>(),
introduceTo(kj::downcast<Connection>(recipient), sendToRecipient.initAs<ThirdPartyCapId>(),
sendToTarget.initAs<RecipientId>());
}
......@@ -351,6 +356,22 @@ Capability::Client RpcSystem<OutgoingSturdyRef, IncomingSturdyRef>::connect(
return baseConnect(_::PointerHelpers<OutgoingSturdyRef>::getInternalReader(ref));
}
template <typename OutgoingSturdyRef, typename IncomingSturdyRef,
typename ProvisionId, typename RecipientId, typename ThirdPartyCapId, typename JoinAnswer>
RpcSystem<OutgoingSturdyRef, IncomingSturdyRef> makeRpcServer(
VatNetwork<OutgoingSturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>& network,
SturdyRefRestorer<IncomingSturdyRef>& restorer, const kj::EventLoop& eventLoop) {
return RpcSystem<OutgoingSturdyRef, IncomingSturdyRef>(network, restorer, eventLoop);
}
template <typename OutgoingSturdyRef, typename ProvisionId,
typename RecipientId, typename ThirdPartyCapId, typename JoinAnswer>
RpcSystem<OutgoingSturdyRef> makeRpcClient(
VatNetwork<OutgoingSturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>& network,
const kj::EventLoop& eventLoop) {
return RpcSystem<OutgoingSturdyRef>(network, nullptr, eventLoop);
}
} // namespace capnp
#endif // CAPNP_RPC_H_
......@@ -593,3 +593,17 @@ interface TestPipeline {
cap @0 :TestInterface;
}
}
struct TestSturdyRef {
host @0 :Text;
tag @1 :Tag;
enum Tag {
testInterface @0;
testPipeline @1;
}
}
struct TestProvisionId {}
struct TestRecipientId {}
struct TestThirdPartyCapId {}
struct TestJoinAnswer {}
......@@ -238,6 +238,15 @@ TEST(Async, SeparateFulfillerChained) {
#define EXPECT_ANY_THROW(code) EXPECT_DEATH(code, ".")
#endif
TEST(Async, SeparateFulfillerDiscarded) {
SimpleEventLoop loop;
auto pair = newPromiseAndFulfiller<int>();
pair.fulfiller = nullptr;
EXPECT_ANY_THROW(loop.wait(kj::mv(pair.promise)));
}
TEST(Async, Threads) {
EXPECT_ANY_THROW(EventLoop::current());
......
......@@ -23,6 +23,7 @@
#include "async.h"
#include "debug.h"
#include "vector.h"
#include <exception>
#include <map>
......@@ -271,6 +272,17 @@ public:
inline Impl(const EventLoop& loop, ErrorHandler& errorHandler)
: loop(loop), errorHandler(errorHandler) {}
~Impl() noexcept(false) {
// std::map doesn't like it when elements' destructors throw, so carefully disassemble it.
auto& taskMap = tasks.getWithoutLock();
if (!taskMap.empty()) {
Vector<Own<Task>> deleteMe(taskMap.size());
for (auto& entry: taskMap) {
deleteMe.add(kj::mv(entry.second));
}
}
}
class Task final: public EventLoop::Event {
public:
Task(const Impl& taskSet, Own<_::PromiseNode>&& nodeParam)
......@@ -281,6 +293,10 @@ public:
}
}
~Task() {
disarm();
}
protected:
void fire() override {
// Get the result.
......@@ -387,6 +403,7 @@ bool TransformPromiseNodeBase::onReady(EventLoop::Event& event) noexcept {
void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
getImpl(output);
dropDependency();
})) {
output.addException(kj::mv(*exception));
}
......
......@@ -1242,29 +1242,33 @@ public:
: adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
void get(ExceptionOrValue& output) noexcept override {
KJ_IREQUIRE(!isWaiting());
output.as<T>() = kj::mv(result);
}
private:
Adapter adapter;
ExceptionOr<T> result;
bool waiting = true;
void fulfill(T&& value) override {
if (isWaiting()) {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(kj::mv(value));
setReady();
}
}
void reject(Exception&& exception) override {
if (isWaiting()) {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(false, kj::mv(exception));
setReady();
}
}
bool isWaiting() override {
return result.value == nullptr && result.exception == nullptr;
return waiting;
}
};
......@@ -1375,10 +1379,27 @@ Promise<_::Forked<T>> ForkedPromise<T>::addBranch() const {
namespace _ { // private
template <typename T>
class WeakFulfiller final: public PromiseFulfiller<T> {
class WeakFulfiller final: public PromiseFulfiller<T>, private kj::Disposer {
// A wrapper around PromiseFulfiller which can be detached.
//
// There are a couple non-trivialities here:
// - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
// rejected.
// - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
// detached from the underlying fulfiller, because otherwise the later detach() call will go
// to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
// refcount never goes over 2 and we manually implement the refcounting because we already need
// a mutex anyway. To this end, WeakFulfiller is its own Disposer -- dispose() is called when
// the application discards its owned pointer to the fulfiller and detach() is called when the
// promise is destroyed.
public:
WeakFulfiller(): inner(nullptr) {}
KJ_DISALLOW_COPY(WeakFulfiller);
static kj::Own<WeakFulfiller> make() {
WeakFulfiller* ptr = new WeakFulfiller;
return Own<WeakFulfiller>(ptr, *ptr);
}
void fulfill(FixVoid<T>&& value) override {
auto lock = inner.lockExclusive();
......@@ -1403,12 +1424,39 @@ public:
inner.getWithoutLock() = &newInner;
}
void detach() {
*inner.lockExclusive() = nullptr;
void detach(PromiseFulfiller<T>& from) {
auto lock = inner.lockExclusive();
if (*lock == nullptr) {
// Already disposed.
lock.release();
delete this;
} else {
KJ_IREQUIRE(*lock == &from);
*lock = nullptr;
}
}
private:
MutexGuarded<PromiseFulfiller<T>*> inner;
WeakFulfiller(): inner(nullptr) {}
void disposeImpl(void* pointer) const override {
// TODO(perf): Factor some of this out so it isn't regenerated for every fulfiller type?
auto lock = inner.lockExclusive();
if (*lock == nullptr) {
// Already detached.
lock.release();
delete this;
} else if ((*lock)->isWaiting()) {
(*lock)->reject(kj::Exception(
kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__,
kj::heapString("PromiseFulfiller was destroyed without fulfilling the promise.")));
*lock = nullptr;
}
}
};
template <typename T>
......@@ -1416,15 +1464,16 @@ class PromiseAndFulfillerAdapter {
public:
PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
WeakFulfiller<T>& wrapper)
: wrapper(wrapper) {
: fulfiller(fulfiller), wrapper(wrapper) {
wrapper.attach(fulfiller);
}
~PromiseAndFulfillerAdapter() {
wrapper.detach();
~PromiseAndFulfillerAdapter() noexcept(false) {
wrapper.detach(fulfiller);
}
private:
PromiseFulfiller<T>& fulfiller;
WeakFulfiller<T>& wrapper;
};
......@@ -1438,7 +1487,7 @@ Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
template <typename T>
PromiseFulfillerPair<T> newPromiseAndFulfiller() {
auto wrapper = heap<_::WeakFulfiller<T>>();
auto wrapper = _::WeakFulfiller<T>::make();
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
......@@ -1450,7 +1499,7 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() {
template <typename T>
PromiseFulfillerPair<T> newPromiseAndFulfiller(const EventLoop& loop) {
auto wrapper = heap<_::WeakFulfiller<T>>();
auto wrapper = _::WeakFulfiller<T>::make();
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment