Commit 3c7efbb4 authored by Kenton Varda's avatar Kenton Varda

Eliminate the concept of imbuing messages in favor of the simpler concept of…

Eliminate the concept of imbuing messages in favor of the simpler concept of setting a cap table directly on MessageReader / getting one from MessageBuilder.  This eliminates capability-context entirely.  This was made possible by the earlier change which moved capabilities to a separate table rather than storing CapDescriptors inline, but I didn't realize it at the time.
parent bf5dbebf
......@@ -160,7 +160,6 @@ includecapnp_HEADERS = \
src/capnp/any.h \
src/capnp/message.h \
src/capnp/capability.h \
src/capnp/capability-context.h \
src/capnp/schema.capnp.h \
src/capnp/schema.h \
src/capnp/schema-loader.h \
......@@ -235,7 +234,6 @@ libcapnp_rpc_la_LDFLAGS = -release $(VERSION) -no-undefined
libcapnp_rpc_la_SOURCES= \
src/capnp/serialize-async.c++ \
src/capnp/capability.c++ \
src/capnp/capability-context.c++ \
src/capnp/dynamic-capability.c++ \
src/capnp/rpc.c++ \
src/capnp/rpc.capnp.c++ \
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// This sample code appears in the documentation for the C++ implementation.
//
// Compile with:
......
# Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
@0x9eb32e19f86ee174;
using Cxx = import "/capnp/c++.capnp";
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "calculator.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/debug.h>
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "calculator.capnp.h"
#include <kj/debug.h>
#include <capnp/ez-rpc.h>
#include <capnp/capability-context.h> // for LocalMessage
#include <capnp/message.h>
#include <iostream>
typedef unsigned int uint;
......@@ -90,13 +113,15 @@ class FunctionImpl final: public Calculator::Function::Server {
public:
FunctionImpl(uint paramCount, Calculator::Expression::Reader body)
: paramCount(paramCount), body(body) {}
: paramCount(paramCount) {
this->body.setRoot(body);
}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == paramCount, "Wrong number of parameters.");
return evaluateImpl(body.getRoot().getAs<Calculator::Expression>(), params)
return evaluateImpl(body.getRoot<Calculator::Expression>(), params)
.then([context](double value) mutable {
context.getResults().setValue(value);
});
......@@ -106,10 +131,8 @@ private:
uint paramCount;
// The function's arity.
capnp::LocalMessage body;
// LocalMessage holds a message that might contain capabilities (interface
// references). Here we're using it to hold a Calculator.Expression, which
// might contain Calculator.Function and/or Calculator.Value capabilities.
capnp::MallocMessageBuilder body;
// Stores a permanent copy of the function body.
};
class OperatorImpl final: public Calculator::Function::Server {
......
# Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
@0x85150b117366d14b;
interface Calculator {
......
......@@ -25,7 +25,6 @@
#include "arena.h"
#include "message.h"
#include "capability.h"
#include "capability-context.h"
#include <kj/debug.h>
#include <kj/refcount.h>
#include <vector>
......@@ -36,7 +35,6 @@ namespace capnp {
namespace _ { // private
Arena::~Arena() noexcept(false) {}
BuilderArena::~BuilderArena() noexcept(false) {}
void ReadLimiter::unread(WordCount64 amount) {
// Be careful not to overflow here. Since ReadLimiter has no thread-safety, it's possible that
......@@ -51,14 +49,14 @@ void ReadLimiter::unread(WordCount64 amount) {
// =======================================================================================
BasicReaderArena::BasicReaderArena(MessageReader* message)
ReaderArena::ReaderArena(MessageReader* message)
: message(message),
readLimiter(message->getOptions().traversalLimitInWords * WORDS),
segment0(this, SegmentId(0), message->getSegment(0), &readLimiter) {}
BasicReaderArena::~BasicReaderArena() noexcept(false) {}
ReaderArena::~ReaderArena() noexcept(false) {}
SegmentReader* BasicReaderArena::tryGetSegment(SegmentId id) {
SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
if (id == SegmentId(0)) {
if (segment0.getArray() == nullptr) {
return nullptr;
......@@ -96,102 +94,41 @@ SegmentReader* BasicReaderArena::tryGetSegment(SegmentId id) {
return result;
}
void BasicReaderArena::reportReadLimitReached() {
void ReaderArena::reportReadLimitReached() {
KJ_FAIL_REQUIRE("Exceeded message traversal limit. See capnp::ReaderOptions.") {
return;
}
}
kj::Maybe<kj::Own<ClientHook>> BasicReaderArena::extractCap(uint index) {
return nullptr;
}
// =======================================================================================
ImbuedReaderArena::ImbuedReaderArena(Arena* base, BrokenCapFactory& brokenCapFactory,
kj::Array<kj::Own<ClientHook>>&& capTable)
: base(base), brokenCapFactory(brokenCapFactory), capTable(kj::mv(capTable)),
segment0(nullptr) {}
ImbuedReaderArena::~ImbuedReaderArena() noexcept(false) {}
SegmentReader* ImbuedReaderArena::imbue(SegmentReader* baseSegment) {
if (baseSegment == nullptr) return nullptr;
if (baseSegment->getSegmentId() == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
kj::dtor(segment0);
kj::ctor(segment0, this, baseSegment);
}
KJ_DASSERT(segment0.getArray().begin() == baseSegment->getArray().begin());
return &segment0;
}
auto lock = moreSegments.lockExclusive();
SegmentMap* segments = nullptr;
KJ_IF_MAYBE(s, *lock) {
auto iter = s->get()->find(baseSegment);
if (iter != s->get()->end()) {
KJ_DASSERT(iter->second->getArray().begin() == baseSegment->getArray().begin());
return iter->second;
}
segments = *s;
} else {
auto newMap = kj::heap<SegmentMap>();
segments = newMap;
*lock = kj::mv(newMap);
}
auto newSegment = kj::heap<ImbuedSegmentReader>(this, baseSegment);
SegmentReader* result = newSegment;
segments->insert(std::make_pair(baseSegment, mv(newSegment)));
return result;
}
SegmentReader* ImbuedReaderArena::tryGetSegment(SegmentId id) {
return imbue(base->tryGetSegment(id));
}
void ImbuedReaderArena::reportReadLimitReached() {
return base->reportReadLimitReached();
}
kj::Maybe<kj::Own<ClientHook>> ImbuedReaderArena::extractCap(uint index) {
kj::Maybe<kj::Own<ClientHook>> ReaderArena::extractCap(uint index) {
if (index < capTable.size()) {
return capTable[index]->addRef();
return capTable[index].map([](kj::Own<ClientHook>& cap) { return cap->addRef(); });
} else {
KJ_FAIL_ASSERT("Invalid capability descriptor in message.") {
// Work around http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 and
// http://llvm.org/bugs/show_bug.cgi?id=12286.
break;
}
return brokenCapFactory.newBrokenCap("Calling capability from invalid descriptor.");
return nullptr;
}
}
// =======================================================================================
BasicBuilderArena::BasicBuilderArena(MessageBuilder* message)
BuilderArena::BuilderArena(MessageBuilder* message)
: message(message), segment0(nullptr, SegmentId(0), nullptr, nullptr) {}
BasicBuilderArena::~BasicBuilderArena() noexcept(false) {}
BuilderArena::~BuilderArena() noexcept(false) {}
SegmentBuilder* BasicBuilderArena::getSegment(SegmentId id) {
SegmentBuilder* BuilderArena::getSegment(SegmentId id) {
// This method is allowed to fail if the segment ID is not valid.
if (id == SegmentId(0)) {
return &segment0;
} else {
KJ_IF_MAYBE(s, moreSegments) {
KJ_REQUIRE(id.value - 1 < s->get()->builders.size(), "invalid segment id", id.value);
// TODO(cleanup): Return a const SegmentBuilder and tediously constify all SegmentBuilder
// pointers throughout the codebase.
return const_cast<BasicSegmentBuilder*>(s->get()->builders[id.value - 1].get());
return const_cast<SegmentBuilder*>(s->get()->builders[id.value - 1].get());
} else {
KJ_FAIL_REQUIRE("invalid segment id", id.value);
}
}
}
BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount) {
BuilderArena::AllocateResult BuilderArena::allocate(WordCount amount) {
if (segment0.getArena() == nullptr) {
// We're allocating the first segment.
kj::ArrayPtr<word> ptr = message->allocateSegment(amount / WORDS);
......@@ -230,7 +167,7 @@ BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount)
moreSegments = kj::mv(newSegmentState);
}
kj::Own<BasicSegmentBuilder> newBuilder = kj::heap<BasicSegmentBuilder>(
kj::Own<SegmentBuilder> newBuilder = kj::heap<SegmentBuilder>(
this, SegmentId(segmentState->builders.size() + 1),
message->allocateSegment(amount / WORDS), &this->dummyLimiter);
SegmentBuilder* result = newBuilder.get();
......@@ -245,7 +182,7 @@ BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount)
}
}
kj::ArrayPtr<const kj::ArrayPtr<const word>> BasicBuilderArena::getSegmentsForOutput() {
kj::ArrayPtr<const kj::ArrayPtr<const word>> BuilderArena::getSegmentsForOutput() {
// Although this is a read-only method, we shouldn't need to lock a mutex here because if this
// is called multiple times simultaneously, we should only be overwriting the array with the
// exact same data. If the number or size of segments is actually changing due to an activity
......@@ -276,7 +213,7 @@ kj::ArrayPtr<const kj::ArrayPtr<const word>> BasicBuilderArena::getSegmentsForOu
}
}
SegmentReader* BasicBuilderArena::tryGetSegment(SegmentId id) {
SegmentReader* BuilderArena::tryGetSegment(SegmentId id) {
if (id == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
// We haven't allocated any segments yet.
......@@ -297,102 +234,21 @@ SegmentReader* BasicBuilderArena::tryGetSegment(SegmentId id) {
}
}
void BasicBuilderArena::reportReadLimitReached() {
void BuilderArena::reportReadLimitReached() {
KJ_FAIL_ASSERT("Read limit reached for BuilderArena, but it should have been unlimited.") {
return;
}
}
kj::Maybe<kj::Own<ClientHook>> BasicBuilderArena::extractCap(uint index) {
return nullptr;
}
uint BasicBuilderArena::injectCap(kj::Own<ClientHook>&& cap) {
KJ_FAIL_REQUIRE("Cannot inject capability into a builder that has not been imbued with a "
"capability context.") {
return 0;
}
}
void BasicBuilderArena::dropCap(uint index) {
// They only way we could have a cap in the first place is if the error was already reported...
}
// =======================================================================================
ImbuedBuilderArena::ImbuedBuilderArena(BuilderArena* base, BrokenCapFactory& brokenCapFactory)
: base(base), brokenCapFactory(brokenCapFactory), segment0(nullptr) {}
ImbuedBuilderArena::~ImbuedBuilderArena() noexcept(false) {}
SegmentBuilder* ImbuedBuilderArena::imbue(SegmentBuilder* baseSegment) {
if (baseSegment == nullptr) return nullptr;
SegmentBuilder* result;
if (baseSegment->getSegmentId() == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
kj::dtor(segment0);
kj::ctor(segment0, this, baseSegment);
}
result = &segment0;
} else {
MultiSegmentState* segmentState;
KJ_IF_MAYBE(s, moreSegments) {
segmentState = *s;
} else {
auto newState = kj::heap<MultiSegmentState>();
segmentState = newState;
moreSegments = kj::mv(newState);
}
auto id = baseSegment->getSegmentId().value;
if (id >= segmentState->builders.size()) {
segmentState->builders.resize(id + 1);
}
KJ_IF_MAYBE(segment, segmentState->builders[id]) {
result = *segment;
} else {
auto newBuilder = kj::heap<ImbuedSegmentBuilder>(this, baseSegment);
result = newBuilder;
segmentState->builders[id] = kj::mv(newBuilder);
}
}
KJ_DASSERT(result->getArray().begin() == baseSegment->getArray().begin());
return result;
}
SegmentReader* ImbuedBuilderArena::tryGetSegment(SegmentId id) {
return imbue(static_cast<SegmentBuilder*>(base->tryGetSegment(id)));
}
void ImbuedBuilderArena::reportReadLimitReached() {
base->reportReadLimitReached();
}
kj::Maybe<kj::Own<ClientHook>> ImbuedBuilderArena::extractCap(uint index) {
kj::Maybe<kj::Own<ClientHook>> BuilderArena::extractCap(uint index) {
if (index < capTable.size()) {
return capTable[index]->addRef();
return capTable[index].map([](kj::Own<ClientHook>& cap) { return cap->addRef(); });
} else {
KJ_FAIL_ASSERT("Invalid capability descriptor in message.") {
// Work around http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799 and
// http://llvm.org/bugs/show_bug.cgi?id=12286.
break;
}
return brokenCapFactory.newBrokenCap("Calling capability from invalid descriptor.");
return nullptr;
}
}
SegmentBuilder* ImbuedBuilderArena::getSegment(SegmentId id) {
return imbue(base->getSegment(id));
}
BuilderArena::AllocateResult ImbuedBuilderArena::allocate(WordCount amount) {
auto result = base->allocate(amount);
result.segment = imbue(result.segment);
return result;
}
uint ImbuedBuilderArena::injectCap(kj::Own<ClientHook>&& cap) {
uint BuilderArena::injectCap(kj::Own<ClientHook>&& cap) {
// TODO(perf): Detect if the cap is already on the table and reuse the index? Perhaps this
// doesn't happen enough to be worth the effort.
uint result = capTable.size();
......@@ -400,7 +256,7 @@ uint ImbuedBuilderArena::injectCap(kj::Own<ClientHook>&& cap) {
return result;
}
void ImbuedBuilderArena::dropCap(uint index) {
void BuilderArena::dropCap(uint index) {
KJ_ASSERT(index < capTable.size(), "Invalid capability descriptor in message.") {
return;
}
......
......@@ -47,11 +47,7 @@ namespace _ { // private
class SegmentReader;
class SegmentBuilder;
class Arena;
class BasicReaderArena;
class ImbuedReaderArena;
class BuilderArena;
class BasicBuilderArena;
class ImbuedBuilderArena;
class ReadLimiter;
class Segment;
......@@ -133,20 +129,12 @@ private:
KJ_DISALLOW_COPY(SegmentReader);
friend class SegmentBuilder;
friend class ImbuedSegmentBuilder;
friend class ImbuedSegmentReader;
};
class ImbuedSegmentReader: public SegmentReader {
public:
inline ImbuedSegmentReader(Arena* arena, SegmentReader* base);
inline ImbuedSegmentReader(decltype(nullptr));
};
class SegmentBuilder: public SegmentReader {
public:
inline SegmentBuilder(BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr,
ReadLimiter* readLimiter, word** pos);
ReadLimiter* readLimiter);
KJ_ALWAYS_INLINE(word* allocate(WordCount amount));
inline word* getPtrUnchecked(WordCount offset);
......@@ -158,35 +146,13 @@ public:
inline void reset();
private:
word** pos;
word* pos;
// Pointer to a pointer to the current end point of the segment, i.e. the location where the
// next object should be allocated. The extra level of indirection allows an
// ImbuedSegmentBuilder to share this pointer with the underlying BasicSegmentBuilder.
friend class ImbuedSegmentBuilder;
// next object should be allocated.
KJ_DISALLOW_COPY(SegmentBuilder);
};
class BasicSegmentBuilder: public SegmentBuilder {
public:
inline BasicSegmentBuilder(BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr,
ReadLimiter* readLimiter);
private:
word* actualPos;
KJ_DISALLOW_COPY(BasicSegmentBuilder);
};
class ImbuedSegmentBuilder: public SegmentBuilder {
public:
inline ImbuedSegmentBuilder(ImbuedBuilderArena* arena, SegmentBuilder* base);
inline ImbuedSegmentBuilder(decltype(nullptr));
KJ_DISALLOW_COPY(ImbuedSegmentBuilder);
};
class Arena {
public:
virtual ~Arena() noexcept(false);
......@@ -200,16 +166,21 @@ public:
// will need to continue with default values.
virtual kj::Maybe<kj::Own<ClientHook>> extractCap(uint index) = 0;
// Extract the capability at the given index. If the index is invalid, returns a dummy
// capability whose methods all throw. Returns null only if the message is not imbued with a
// capability context.
// Extract the capability at the given index. If the index is invalid, returns null.
};
class BasicReaderArena final: public Arena {
class ReaderArena final: public Arena {
public:
BasicReaderArena(MessageReader* message);
~BasicReaderArena() noexcept(false);
KJ_DISALLOW_COPY(BasicReaderArena);
ReaderArena(MessageReader* message);
~ReaderArena() noexcept(false);
KJ_DISALLOW_COPY(ReaderArena);
inline void initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable) {
// Imbues the arena with a capability table. This is not passed to the constructor because the
// table itself may be built based on some other part of the message (as is the case with the
// RPC protocol).
this->capTable = kj::mv(capTable);
}
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
......@@ -219,6 +190,7 @@ public:
private:
MessageReader* message;
ReadLimiter readLimiter;
kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable;
// Optimize for single-segment messages so that small messages are handled quickly.
SegmentReader segment0;
......@@ -234,37 +206,25 @@ private:
// possibly backed by the same data)?
};
class ImbuedReaderArena final: public Arena {
public:
ImbuedReaderArena(Arena* base, BrokenCapFactory& brokenCapFactory,
kj::Array<kj::Own<ClientHook>>&& capTable);
~ImbuedReaderArena() noexcept(false);
KJ_DISALLOW_COPY(ImbuedReaderArena);
SegmentReader* imbue(SegmentReader* base);
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
kj::Maybe<kj::Own<ClientHook>> extractCap(uint index);
class BuilderArena final: public Arena {
// A BuilderArena that does not allow the injection of capabilities.
private:
Arena* base;
BrokenCapFactory& brokenCapFactory;
kj::Array<kj::Own<ClientHook>> capTable;
public:
BuilderArena(MessageBuilder* message);
~BuilderArena() noexcept(false);
KJ_DISALLOW_COPY(BuilderArena);
// Optimize for single-segment messages so that small messages are handled quickly.
ImbuedSegmentReader segment0;
inline SegmentBuilder* getRootSegment() { return &segment0; }
typedef std::unordered_map<SegmentReader*, kj::Own<ImbuedSegmentReader>> SegmentMap;
kj::MutexGuarded<kj::Maybe<kj::Own<SegmentMap>>> moreSegments;
};
kj::ArrayPtr<const kj::ArrayPtr<const word>> getSegmentsForOutput();
// Get an array of all the segments, suitable for writing out. This only returns the allocated
// portion of each segment, whereas tryGetSegment() returns something that includes
// not-yet-allocated space.
class BuilderArena: public Arena {
public:
virtual ~BuilderArena() noexcept(false);
inline kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> getCapTable() { return capTable; }
// Return the capability table.
virtual SegmentBuilder* getSegment(SegmentId id) = 0;
SegmentBuilder* getSegment(SegmentId id);
// Get the segment with the given id. Crashes or throws an exception if no such segment exists.
struct AllocateResult {
......@@ -272,99 +232,40 @@ public:
word* words;
};
virtual AllocateResult allocate(WordCount amount) = 0;
AllocateResult allocate(WordCount amount);
// Find a segment with at least the given amount of space available and allocate the space.
// Note that allocating directly from a particular segment is much faster, but allocating from
// the arena is guaranteed to succeed. Therefore callers should try to allocate from a specific
// segment first if there is one, then fall back to the arena.
virtual uint injectCap(kj::Own<ClientHook>&& cap) = 0;
uint injectCap(kj::Own<ClientHook>&& cap);
// Add the capability to the message and return its index. If the same ClientHook is injected
// twice, this may return the same index both times, but in this case dropCap() needs to be
// called an equal number of times to actually remove the cap.
virtual void dropCap(uint index) = 0;
void dropCap(uint index);
// Remove a capability injected earlier. Called when the pointer is overwritten or zero'd out.
};
class BasicBuilderArena final: public BuilderArena {
// A BuilderArena that does not allow the injection of capabilities.
public:
BasicBuilderArena(MessageBuilder* message);
~BasicBuilderArena() noexcept(false);
KJ_DISALLOW_COPY(BasicBuilderArena);
inline SegmentBuilder* getRootSegment() { return &segment0; }
kj::ArrayPtr<const kj::ArrayPtr<const word>> getSegmentsForOutput();
// Get an array of all the segments, suitable for writing out. This only returns the allocated
// portion of each segment, whereas tryGetSegment() returns something that includes
// not-yet-allocated space.
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
kj::Maybe<kj::Own<ClientHook>> extractCap(uint index);
// implements BuilderArena -----------------------------------------
SegmentBuilder* getSegment(SegmentId id) override;
AllocateResult allocate(WordCount amount) override;
uint injectCap(kj::Own<ClientHook>&& cap);
void dropCap(uint index);
private:
MessageBuilder* message;
ReadLimiter dummyLimiter;
kj::Vector<kj::Maybe<kj::Own<ClientHook>>> capTable;
BasicSegmentBuilder segment0;
SegmentBuilder segment0;
kj::ArrayPtr<const word> segment0ForOutput;
struct MultiSegmentState {
kj::Vector<kj::Own<BasicSegmentBuilder>> builders;
kj::Vector<kj::Own<SegmentBuilder>> builders;
kj::Vector<kj::ArrayPtr<const word>> forOutput;
};
kj::Maybe<kj::Own<MultiSegmentState>> moreSegments;
};
class ImbuedBuilderArena final: public BuilderArena {
// A BuilderArena imbued with the ability to inject capabilities.
public:
ImbuedBuilderArena(BuilderArena* base, BrokenCapFactory& brokenCapFactory);
~ImbuedBuilderArena() noexcept(false);
KJ_DISALLOW_COPY(ImbuedBuilderArena);
SegmentBuilder* imbue(SegmentBuilder* baseSegment);
// Return an imbued SegmentBuilder corresponding to the given segment from the base arena.
inline kj::ArrayPtr<kj::Own<ClientHook>> getCapTable() { return capTable; }
// Release and return the capability table.
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
kj::Maybe<kj::Own<ClientHook>> extractCap(uint index);
// implements BuilderArena -----------------------------------------
SegmentBuilder* getSegment(SegmentId id) override;
AllocateResult allocate(WordCount amount) override;
uint injectCap(kj::Own<ClientHook>&& cap);
void dropCap(uint index);
private:
BuilderArena* base;
BrokenCapFactory& brokenCapFactory;
kj::Vector<kj::Own<ClientHook>> capTable;
ImbuedSegmentBuilder segment0;
struct MultiSegmentState {
kj::Vector<kj::Maybe<kj::Own<ImbuedSegmentBuilder>>> builders;
};
kj::Maybe<kj::Own<MultiSegmentState>> moreSegments;
};
// =======================================================================================
inline ReadLimiter::ReadLimiter()
......@@ -411,25 +312,20 @@ inline WordCount SegmentReader::getSize() { return ptr.size() * WORDS; }
inline kj::ArrayPtr<const word> SegmentReader::getArray() { return ptr; }
inline void SegmentReader::unread(WordCount64 amount) { readLimiter->unread(amount); }
inline ImbuedSegmentReader::ImbuedSegmentReader(Arena* arena, SegmentReader* base)
: SegmentReader(arena, base->id, base->ptr, base->readLimiter) {}
inline ImbuedSegmentReader::ImbuedSegmentReader(decltype(nullptr))
: SegmentReader(nullptr, SegmentId(0), nullptr, nullptr) {}
// -------------------------------------------------------------------
inline SegmentBuilder::SegmentBuilder(
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter, word** pos)
: SegmentReader(arena, id, ptr, readLimiter), pos(pos) {}
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter)
: SegmentReader(arena, id, ptr, readLimiter), pos(ptr.begin()) {}
inline word* SegmentBuilder::allocate(WordCount amount) {
if (intervalLength(*pos, ptr.end()) < amount) {
if (intervalLength(pos, ptr.end()) < amount) {
// Not enough space in the segment for this allocation.
return nullptr;
} else {
// Success.
word* result = *pos;
*pos = *pos + amount;
word* result = pos;
pos = pos + amount;
return result;
}
}
......@@ -447,27 +343,15 @@ inline BuilderArena* SegmentBuilder::getArena() {
}
inline kj::ArrayPtr<const word> SegmentBuilder::currentlyAllocated() {
return kj::arrayPtr(ptr.begin(), *pos - ptr.begin());
return kj::arrayPtr(ptr.begin(), pos - ptr.begin());
}
inline void SegmentBuilder::reset() {
word* start = getPtrUnchecked(0 * WORDS);
memset(start, 0, (*pos - start) * sizeof(word));
*pos = start;
memset(start, 0, (pos - start) * sizeof(word));
pos = start;
}
inline BasicSegmentBuilder::BasicSegmentBuilder(
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter)
: SegmentBuilder(arena, id, ptr, readLimiter, &actualPos),
actualPos(ptr.begin()) {}
inline ImbuedSegmentBuilder::ImbuedSegmentBuilder(ImbuedBuilderArena* arena, SegmentBuilder* base)
: SegmentBuilder(arena, base->id,
kj::arrayPtr(const_cast<word*>(base->ptr.begin()), base->ptr.size()),
base->readLimiter, base->pos) {}
inline ImbuedSegmentBuilder::ImbuedSegmentBuilder(decltype(nullptr))
: SegmentBuilder(nullptr, SegmentId(0), nullptr, nullptr, nullptr) {}
} // namespace _ (private)
} // namespace capnp
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define CAPNP_PRIVATE
#include "capability-context.h"
#include "capability.h"
#include "arena.h"
#include <kj/debug.h>
namespace capnp {
namespace _ {
void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.
} // namespace _
namespace {
class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
return capnp::newBrokenCap(description);
}
};
static BrokenCapFactoryImpl brokenCapFactory;
} // namespace
CapReaderContext::CapReaderContext(kj::Array<kj::Own<ClientHook>>&& capTable)
: capTable(kj::mv(capTable)) {
setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}
CapReaderContext::~CapReaderContext() noexcept(false) {
if (capTable == nullptr) {
kj::dtor(arena());
}
}
AnyPointer::Reader CapReaderContext::imbue(AnyPointer::Reader base) {
KJ_IF_MAYBE(oldArena, base.reader.getArena()) {
static_assert(sizeof(arena()) <= sizeof(arenaSpace),
"arenaSpace is too small. Please increase it.");
kj::ctor(arena(), oldArena, brokenCapFactory,
kj::mv(KJ_REQUIRE_NONNULL(capTable, "imbue() can only be called once.")));
} else {
KJ_FAIL_REQUIRE("Cannot imbue unchecked message.");
}
capTable = nullptr;
return AnyPointer::Reader(base.reader.imbue(arena()));
}
CapBuilderContext::CapBuilderContext() {
setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}
CapBuilderContext::~CapBuilderContext() noexcept(false) {
if (arenaAllocated) {
kj::dtor(arena());
}
}
AnyPointer::Builder CapBuilderContext::imbue(AnyPointer::Builder base) {
KJ_REQUIRE(!arenaAllocated, "imbue() can only be called once.");
static_assert(sizeof(arena()) <= sizeof(arenaSpace),
"arenaSpace is too small. Please increase it.");
kj::ctor(arena(), base.builder.getArena(), brokenCapFactory);
arenaAllocated = true;
return AnyPointer::Builder(base.builder.imbue(arena()));
}
kj::ArrayPtr<kj::Own<ClientHook>> CapBuilderContext::getCapTable() {
if (arenaAllocated) {
return arena().getCapTable();
} else {
return nullptr;
}
}
// =======================================================================================
namespace {
uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
KJ_IF_MAYBE(s, sizeHint) {
// 1 for the root pointer. We don't store caps in the message so we don't count those here.
return s->wordCount + 1;
} else {
return SUGGESTED_FIRST_SEGMENT_WORDS;
}
}
} // namespace
LocalMessage::LocalMessage(kj::Maybe<MessageSize> sizeHint)
: message(firstSegmentSize(sizeHint)),
root(capContext.imbue(message.getRoot<AnyPointer>())) {}
// =======================================================================================
namespace {
class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
BrokenPipeline(const kj::Exception& exception): exception(exception) {}
kj::Own<PipelineHook> addRef() override {
return kj::addRef(*this);
}
kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;
private:
kj::Exception exception;
};
class BrokenRequest final: public RequestHook {
public:
BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
: exception(exception), message(sizeHint) {}
RemotePromise<AnyPointer> send() override {
return RemotePromise<AnyPointer>(kj::cp(exception),
AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
}
const void* getBrand() {
return nullptr;
}
kj::Exception exception;
LocalMessage message;
};
class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
BrokenClient(const kj::Exception& exception): exception(exception) {}
BrokenClient(const kj::StringPtr description)
: exception(kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
"", 0, kj::str(description)) {}
Request<AnyPointer, AnyPointer> newCall(
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
auto hook = kj::heap<BrokenRequest>(exception, sizeHint);
auto root = hook->message.getRoot();
return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}
VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
kj::Own<CallContextHook>&& context) override {
return VoidPromiseAndPipeline { kj::cp(exception), kj::heap<BrokenPipeline>(exception) };
}
kj::Maybe<ClientHook&> getResolved() {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
}
kj::Own<ClientHook> addRef() override {
return kj::addRef(*this);
}
const void* getBrand() override {
return nullptr;
}
private:
kj::Exception exception;
};
kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
return kj::refcounted<BrokenClient>(exception);
}
} // namespace
kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
return kj::refcounted<BrokenClient>(reason);
}
kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
return kj::refcounted<BrokenClient>(kj::mv(reason));
}
kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}
} // namespace capnp
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Classes for imbuing message readers/builders with a capability context.
//
// These classes are for use by RPC implementations. Application code need not know about them.
//
// Normally, MessageReader and MessageBuilder do not support interface pointers because they
// are not RPC-aware and so have no idea how to convert between a serialized CapabilityDescriptor
// and a live capability. To fix this, a reader/builder object needs to be "imbued" with a
// capability context. This creates a new reader/builder which points at the same object but has
// the ability to deal with interface fields. Use `CapReaderContext` and `CapBuilderContext` to
// accomplish this.
#ifndef CAPNP_CAPABILITY_CONTEXT_H_
#define CAPNP_CAPABILITY_CONTEXT_H_
#include "layout.h"
#include "any.h"
#include "message.h"
#include <kj/mutex.h>
#include <kj/vector.h>
namespace kj { class Exception; }
namespace capnp {
class ClientHook;
// -------------------------------------------------------------------
class CapReaderContext {
// Class which can "imbue" reader objects from some other message with a capability context,
// so that interface pointers found in the message can be extracted and called.
//
// `imbue()` can only be called once per context.
public:
CapReaderContext(kj::Array<kj::Own<ClientHook>>&& capTable);
// `capTable` is the list of capabilities for this message.
~CapReaderContext() noexcept(false);
AnyPointer::Reader imbue(AnyPointer::Reader base);
private:
kj::Maybe<kj::Array<kj::Own<ClientHook>>> capTable; // becomes null once arena is allocated
void* arenaSpace[12 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
_::ImbuedReaderArena& arena() { return *reinterpret_cast<_::ImbuedReaderArena*>(arenaSpace); }
friend class _::ImbuedReaderArena;
};
class CapBuilderContext {
// Class which can "imbue" reader objects from some other message with a capability context,
// so that interface pointers found in the message can be set to point at live capabilities.
//
// `imbue()` can only be called once per context.
public:
CapBuilderContext();
~CapBuilderContext() noexcept(false);
AnyPointer::Builder imbue(AnyPointer::Builder base);
kj::ArrayPtr<kj::Own<ClientHook>> getCapTable();
// Return the table of capabilities injected into the message.
private:
bool arenaAllocated = false;
void* arenaSpace[15];
_::ImbuedBuilderArena& arena() { return *reinterpret_cast<_::ImbuedBuilderArena*>(arenaSpace); }
friend class _::ImbuedBuilderArena;
};
// -------------------------------------------------------------------
class LocalMessage final {
// An in-process message which can contain capabilities. Use in place of MallocMessageBuilder
// when you need to be able to construct a message in-memory that contains capabilities, and this
// message will never leave the process. You cannot serialize this message, since it doesn't
// know how to properly serialize its capabilities.
public:
explicit LocalMessage(kj::Maybe<MessageSize> sizeHint = nullptr);
template <typename T, typename = FromReader<T>>
inline LocalMessage(T&& reader): LocalMessage(reader.totalSize()) {
// Create a LocalMessage that is a copy of a given reader.
getRoot().setAs<FromReader<T>>(kj::fwd<T>(reader));
}
inline AnyPointer::Builder getRoot() { return root; }
inline AnyPointer::Reader getRootReader() const { return root.asReader(); }
private:
MallocMessageBuilder message;
CapBuilderContext capContext;
AnyPointer::Builder root;
};
kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason);
kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason);
// Helper function that creates a capability which simply throws exceptions when called.
kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason);
// Helper function that creates a pipeline which simply throws exceptions when called.
} // namespace capnp
#endif // CAPNP_CAPABILITY_CONTEXT_H_
......@@ -24,7 +24,6 @@
#define CAPNP_PRIVATE
#include "capability.h"
#include "capability-context.h"
#include "message.h"
#include "arena.h"
#include <kj/refcount.h>
......@@ -34,6 +33,37 @@
namespace capnp {
namespace _ {
void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.
} // namespace _
namespace {
class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
return capnp::newBrokenCap(description);
}
};
static BrokenCapFactoryImpl brokenCapFactory;
} // namespace
ClientHook::ClientHook() {
setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}
void MessageReader::initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable) {
setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
arena()->initCapTable(kj::mv(capTable));
}
// =======================================================================================
Capability::Client::Client(decltype(nullptr))
: hook(newBrokenCap("Called null capability.")) {}
......@@ -86,24 +116,32 @@ kj::Promise<void> ClientHook::whenResolved() {
// =======================================================================================
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
KJ_IF_MAYBE(s, sizeHint) {
return s->wordCount;
} else {
return SUGGESTED_FIRST_SEGMENT_WORDS;
}
}
class LocalResponse final: public ResponseHook, public kj::Refcounted {
public:
LocalResponse(kj::Maybe<MessageSize> sizeHint)
: message(sizeHint) {}
: message(firstSegmentSize(sizeHint)) {}
LocalMessage message;
MallocMessageBuilder message;
};
class LocalCallContext final: public CallContextHook, public kj::Refcounted {
public:
LocalCallContext(kj::Own<LocalMessage>&& request, kj::Own<ClientHook> clientRef,
LocalCallContext(kj::Own<MallocMessageBuilder>&& request, kj::Own<ClientHook> clientRef,
kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller)
: request(kj::mv(request)), clientRef(kj::mv(clientRef)),
cancelAllowedFulfiller(kj::mv(cancelAllowedFulfiller)) {}
AnyPointer::Reader getParams() override {
KJ_IF_MAYBE(r, request) {
return r->get()->getRoot();
return r->get()->getRoot<AnyPointer>();
} else {
KJ_FAIL_REQUIRE("Can't call getParams() after releaseParams().");
}
......@@ -114,7 +152,7 @@ public:
AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
if (response == nullptr) {
auto localResponse = kj::refcounted<LocalResponse>(sizeHint);
responseBuilder = localResponse->message.getRoot();
responseBuilder = localResponse->message.getRoot<AnyPointer>();
response = Response<AnyPointer>(responseBuilder.asReader(), kj::mv(localResponse));
}
return responseBuilder;
......@@ -149,7 +187,7 @@ public:
return kj::addRef(*this);
}
kj::Maybe<kj::Own<LocalMessage>> request;
kj::Maybe<kj::Own<MallocMessageBuilder>> request;
kj::Maybe<Response<AnyPointer>> response;
AnyPointer::Builder responseBuilder = nullptr; // only valid if `response` is non-null
kj::Own<ClientHook> clientRef;
......@@ -161,7 +199,7 @@ class LocalRequest final: public RequestHook {
public:
inline LocalRequest(uint64_t interfaceId, uint16_t methodId,
kj::Maybe<MessageSize> sizeHint, kj::Own<ClientHook> client)
: message(kj::heap<LocalMessage>(sizeHint)),
: message(kj::heap<MallocMessageBuilder>(firstSegmentSize(sizeHint))),
interfaceId(interfaceId), methodId(methodId), client(kj::mv(client)) {}
RemotePromise<AnyPointer> send() override {
......@@ -204,7 +242,7 @@ public:
return nullptr;
}
kj::Own<LocalMessage> message;
kj::Own<MallocMessageBuilder> message;
private:
uint64_t interfaceId;
......@@ -274,7 +312,7 @@ public:
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
auto hook = kj::heap<LocalRequest>(
interfaceId, methodId, sizeHint, kj::addRef(*this));
auto root = hook->message->getRoot(); // Do not inline `root` -- kj::mv may happen first.
auto root = hook->message->getRoot<AnyPointer>();
return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}
......@@ -424,7 +462,7 @@ public:
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
auto hook = kj::heap<LocalRequest>(
interfaceId, methodId, sizeHint, kj::addRef(*this));
auto root = hook->message->getRoot(); // Do not inline `root` -- kj::mv may happen first.
auto root = hook->message->getRoot<AnyPointer>();
return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}
......@@ -495,4 +533,97 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro
return kj::refcounted<QueuedClient>(kj::mv(promise));
}
// =======================================================================================
namespace {
class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
BrokenPipeline(const kj::Exception& exception): exception(exception) {}
kj::Own<PipelineHook> addRef() override {
return kj::addRef(*this);
}
kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;
private:
kj::Exception exception;
};
class BrokenRequest final: public RequestHook {
public:
BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
: exception(exception), message(firstSegmentSize(sizeHint)) {}
RemotePromise<AnyPointer> send() override {
return RemotePromise<AnyPointer>(kj::cp(exception),
AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
}
const void* getBrand() {
return nullptr;
}
kj::Exception exception;
MallocMessageBuilder message;
};
class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
BrokenClient(const kj::Exception& exception): exception(exception) {}
BrokenClient(const kj::StringPtr description)
: exception(kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
"", 0, kj::str(description)) {}
Request<AnyPointer, AnyPointer> newCall(
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
auto hook = kj::heap<BrokenRequest>(exception, sizeHint);
auto root = hook->message.getRoot<AnyPointer>();
return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}
VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
kj::Own<CallContextHook>&& context) override {
return VoidPromiseAndPipeline { kj::cp(exception), kj::heap<BrokenPipeline>(exception) };
}
kj::Maybe<ClientHook&> getResolved() {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
}
kj::Own<ClientHook> addRef() override {
return kj::addRef(*this);
}
const void* getBrand() override {
return nullptr;
}
private:
kj::Exception exception;
};
kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
return kj::refcounted<BrokenClient>(exception);
}
} // namespace
kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
return kj::refcounted<BrokenClient>(reason);
}
kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
return kj::refcounted<BrokenClient>(kj::mv(reason));
}
kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}
} // namespace capnp
......@@ -332,6 +332,8 @@ public:
class ClientHook {
public:
ClientHook();
virtual Request<AnyPointer, AnyPointer> newCall(
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) = 0;
// Start a new call, allowing the client to allocate request/response objects as it sees fit.
......@@ -410,6 +412,13 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro
// the new client. This hook's `getResolved()` and `whenMoreResolved()` methods will reflect the
// redirection to the eventual replacement client.
kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason);
kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason);
// Helper function that creates a capability which simply throws exceptions when called.
kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason);
// Helper function that creates a pipeline which simply throws exceptions when called.
// =======================================================================================
// Extend PointerHelpers for interfaces
......
......@@ -79,6 +79,16 @@ class EzRpcClient {
// EzRpcClient / EzRpcServer objects in a single thread; they will make sure to make no more
// than one EventLoop.)
// - These classes only support simple two-party connections, not multilateral VatNetworks.
// - These classes only support communication over a raw, unencrypted socket. If you want to
// build on an abstract stream (perhaps one which supports encryption), you must use the
// lower-level interfaces.
//
// Some of these restrictions will probably be lifted in future versions, but some things will
// always require using the low-level interfaces directly. If you are interested in working
// at a lower level, start by looking at these interfaces:
// - `kj::startAsyncIo()` in `kj/async-io.h`.
// - `RpcSystem` in `capnp/rpc.h`.
// - `TwoPartyVatNetwork` in `capnp/rpc-twoparty.h`.
public:
explicit EzRpcClient(kj::StringPtr serverAddress, uint defaultPort = 0);
......
......@@ -279,7 +279,7 @@ static void checkStruct(StructReader reader) {
TEST(WireFormat, StructRoundTrip_OneSegment) {
MallocMessageBuilder message;
BasicBuilderArena arena(&message);
BuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......@@ -316,7 +316,7 @@ TEST(WireFormat, StructRoundTrip_OneSegment) {
TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
MallocMessageBuilder message(0, AllocationStrategy::FIXED_SIZE);
BasicBuilderArena arena(&message);
BuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......@@ -354,7 +354,7 @@ TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
TEST(WireFormat, StructRoundTrip_MultipleSegmentsWithMultipleAllocations) {
MallocMessageBuilder message(8, AllocationStrategy::FIXED_SIZE);
BasicBuilderArena arena(&message);
BuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......
......@@ -37,8 +37,8 @@ static BrokenCapFactory* brokenCapFactory = nullptr;
// but we can't have a link-time dependency on libcapnp-rpc.
void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory) {
// Called from capability-context.c++ when a capability context is created. May be called
// multiple times but always with the same value.
// Called from capability.c++ when the capability API is used, to make sure that layout.c++
// is ready for it. May be called multiple times but always with the same value.
__atomic_store_n(&brokenCapFactory, &factory, __ATOMIC_RELAXED);
}
......@@ -1725,8 +1725,7 @@ struct WireHelpers {
setCapabilityPointer(dstSegment, dst, kj::mv(*cap), orphanArena);
return { dstSegment, nullptr };
} else {
KJ_FAIL_REQUIRE("Message contained capability pointer but is not imbued with a "
"capability context.") {
KJ_FAIL_REQUIRE("Message contained invalid capability pointer.") {
goto useDefault;
}
}
......@@ -1846,28 +1845,21 @@ struct WireHelpers {
"use the Cap'n Proto RPC system.");
if (ref->isNull()) {
maybeCap = brokenCapFactory->newBrokenCap("Calling null capability pointer.");
return brokenCapFactory->newBrokenCap("Calling null capability pointer.");
} else if (!ref->isCapability()) {
KJ_FAIL_REQUIRE(
"Message contains non-capability pointer where capability pointer was expected.") {
break;
}
maybeCap = brokenCapFactory->newBrokenCap(
return brokenCapFactory->newBrokenCap(
"Calling capability extracted from a non-capability pointer.");
} else {
maybeCap = segment->getArena()->extractCap(ref->capRef.index.get());
}
KJ_IF_MAYBE(cap, maybeCap) {
} else KJ_IF_MAYBE(cap, segment->getArena()->extractCap(ref->capRef.index.get())) {
return kj::mv(*cap);
} else {
// The message is not imbued with a capability context. We can't really recover from this,
// because we have no way to construct a ClientHook in this case -- capability.c++ may not
// even be linked in. Luckily, this is the caller's error, not the message sender's --
// it's the message reader who is calling a capability getter on a message they should know
// they have not imbued properly.
KJ_FAIL_REQUIRE("Tried to read a capability out of a message that doesn't have a "
"capability context.");
KJ_FAIL_REQUIRE("Message contains invalid capability pointer.") {
break;
}
return brokenCapFactory->newBrokenCap("Calling invalid capability pointer.");
}
}
......@@ -2216,10 +2208,6 @@ BuilderArena* PointerBuilder::getArena() const {
return segment->getArena();
}
PointerBuilder PointerBuilder::imbue(ImbuedBuilderArena& newArena) const {
return PointerBuilder(newArena.imbue(segment), pointer);
}
// =======================================================================================
// PointerReader
......@@ -2278,10 +2266,6 @@ kj::Maybe<Arena&> PointerReader::getArena() const {
return segment == nullptr ? nullptr : segment->getArena();
}
PointerReader PointerReader::imbue(ImbuedReaderArena& newArena) const {
return PointerReader(newArena.imbue(segment), pointer, nestingLimit);
}
// =======================================================================================
// StructBuilder
......
......@@ -56,8 +56,6 @@ class SegmentReader;
class SegmentBuilder;
class Arena;
class BuilderArena;
class ImbuedReaderArena;
class ImbuedBuilderArena;
// =============================================================================
......@@ -341,9 +339,6 @@ public:
BuilderArena* getArena() const;
// Get the arena containing this pointer.
PointerBuilder imbue(ImbuedBuilderArena& newArena) const;
// Imbue the pointer with a capability context, returning the imbued pointer.
private:
SegmentBuilder* segment; // Memory segment in which the pointer resides.
WirePointer* pointer; // Pointer to the pointer.
......@@ -392,9 +387,6 @@ public:
kj::Maybe<Arena&> getArena() const;
// Get the arena containing this pointer.
PointerReader imbue(ImbuedReaderArena& newArena) const;
// Imbue the pointer with a capability context, returning the imbued pointer.
private:
SegmentReader* segment; // Memory segment in which the pointer resides.
const WirePointer* pointer; // Pointer to the pointer. null = treat as null pointer.
......@@ -473,10 +465,6 @@ public:
BuilderArena* getArena();
// Gets the arena in which this object is allocated.
void unimbue();
// Removes the capability context from the builder. This means replacing the segment pointer --
// which is assumed to point to an ImbuedSegmentBuilder -- with the non-imbued base segment.
private:
SegmentBuilder* segment; // Memory segment in which the struct resides.
void* data; // Pointer to the encoded data.
......@@ -542,10 +530,6 @@ public:
// use the result as a hint for allocating the first segment, do the copy, and then throw an
// exception if it overruns.
void unimbue();
// Removes the capability context from the reader. This means replacing the segment pointer --
// which is assumed to point to an ImbuedSegmentReader -- with the non-imbued base segment.
private:
SegmentReader* segment; // Memory segment in which the struct resides.
......
......@@ -38,16 +38,16 @@ namespace capnp {
MessageReader::MessageReader(ReaderOptions options): options(options), allocatedArena(false) {}
MessageReader::~MessageReader() noexcept(false) {
if (allocatedArena) {
arena()->~BasicReaderArena();
arena()->~ReaderArena();
}
}
AnyPointer::Reader MessageReader::getRootInternal() {
if (!allocatedArena) {
static_assert(sizeof(_::BasicReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BasicReaderArena. Please increase it. This will break "
static_assert(sizeof(_::ReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a ReaderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) _::BasicReaderArena(this);
new(arena()) _::ReaderArena(this);
allocatedArena = true;
}
......@@ -75,7 +75,7 @@ _::SegmentBuilder* MessageBuilder::getRootSegment() {
if (allocatedArena) {
return arena()->getSegment(_::SegmentId(0));
} else {
static_assert(sizeof(_::BasicBuilderArena) <= sizeof(arenaSpace),
static_assert(sizeof(_::BuilderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BuilderArena. Please increase it.");
kj::ctor(*arena(), this);
allocatedArena = true;
......@@ -104,6 +104,14 @@ kj::ArrayPtr<const kj::ArrayPtr<const word>> MessageBuilder::getSegmentsForOutpu
}
}
kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> MessageBuilder::getCapTable() {
if (allocatedArena) {
return arena()->getCapTable();
} else {
return nullptr;
}
}
Orphanage MessageBuilder::getOrphanage() {
// We must ensure that the arena and root pointer have been allocated before the Orphanage
// can be used.
......
......@@ -34,8 +34,8 @@
namespace capnp {
namespace _ { // private
class BasicReaderArena;
class BasicBuilderArena;
class ReaderArena;
class BuilderArena;
}
class StructSchema;
......@@ -118,6 +118,16 @@ public:
// RootType in this case must be DynamicStruct, and you must #include <capnp/dynamic.h> to
// use this.
void initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable);
// Sets the table of capabilities embedded in this message. Capability pointers found in the
// message content contain indexes into this table. You must call this before attempting to
// read any capability pointers (interface pointers) from the message. The table is not passed
// to the constructor because often (as in the RPC system) the cap table is actually constructed
// based on a list read from the message itself.
//
// You must link against libcapnp-rpc to call this method (the rest of MessageBuilder is in
// regular libcapnp).
private:
ReaderOptions options;
......@@ -128,7 +138,7 @@ private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
bool allocatedArena;
_::BasicReaderArena* arena() { return reinterpret_cast<_::BasicReaderArena*>(arenaSpace); }
_::ReaderArena* arena() { return reinterpret_cast<_::ReaderArena*>(arenaSpace); }
AnyPointer::Reader getRootInternal();
};
......@@ -186,11 +196,18 @@ public:
// Like setRoot() but adopts the orphan without copying.
kj::ArrayPtr<const kj::ArrayPtr<const word>> getSegmentsForOutput();
// Get the raw data that makes up the message.
kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> getCapTable();
// Get the table of capabilities (interface pointers) that have been added to this message.
// When you later parse this message, you must call `initCapTable()` on the `MessageReader` and
// give it an equivalent set of capabilities, otherwise cap pointers in the message will be
// unusable.
Orphanage getOrphanage();
private:
void* arenaSpace[16];
void* arenaSpace[17];
// Space in which we can construct a BuilderArena. We don't use BuilderArena directly here
// because we don't want clients to have to #include arena.h, which itself includes a bunch of
// big STL headers. We don't use a pointer to a BuilderArena because that would require an
......@@ -203,7 +220,7 @@ private:
// isn't constructed yet. This is kind of annoying because it means that getOrphanage() is
// not thread-safe, but that shouldn't be a huge deal...
_::BasicBuilderArena* arena() { return reinterpret_cast<_::BasicBuilderArena*>(arenaSpace); }
_::BuilderArena* arena() { return reinterpret_cast<_::BuilderArena*>(arenaSpace); }
_::SegmentBuilder* getRootSegment();
AnyPointer::Builder getRootInternal();
};
......
......@@ -22,9 +22,9 @@
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "rpc.h"
#include "capability-context.h"
#include "test-util.h"
#include "schema.h"
#include "serialize.h"
#include <kj/debug.h>
#include <kj/string-tree.h>
#include <gtest/gtest.h>
......@@ -83,14 +83,7 @@ public:
}
auto payload = call.getParams();
auto capTableReader = payload.getCapTable();
auto capTable = kj::heapArrayBuilder<kj::Own<ClientHook>>(capTableReader.size());
for (uint i = 0; i < capTableReader.size(); i++) {
capTable.add(newBrokenCap("fake cap"));
}
CapReaderContext context(capTable.finish());
auto params = kj::str(context.imbue(payload.getContent()).getAs<DynamicStruct>(paramType));
auto params = kj::str(payload.getContent().getAs<DynamicStruct>(paramType));
auto sendResultsTo = call.getSendResultsTo();
......@@ -120,22 +113,14 @@ public:
}
auto payload = ret.getResults();
auto capTableReader = payload.getCapTable();
auto capTable = kj::heapArrayBuilder<kj::Own<ClientHook>>(capTableReader.size());
for (uint i = 0; i < capTableReader.size(); i++) {
capTable.add(newBrokenCap("fake cap"));
}
CapReaderContext context(capTable.finish());
auto imbued = context.imbue(payload.getContent());
if (schema.getProto().isStruct()) {
auto results = kj::str(imbued.getAs<DynamicStruct>(schema.asStruct()));
auto results = kj::str(payload.getContent().getAs<DynamicStruct>(schema.asStruct()));
return kj::str(senderName, "(", ret.getAnswerId(), "): return ", results,
" caps:[", kj::strArray(payload.getCapTable(), ", "), "]");
} else if (schema.getProto().isInterface()) {
imbued.getAs<DynamicCapability>(schema.asInterface());
payload.getContent().getAs<DynamicCapability>(schema.asInterface());
return kj::str(senderName, "(", ret.getAnswerId(), "): return cap ",
kj::strArray(payload.getCapTable(), ", "));
} else {
......@@ -246,26 +231,37 @@ public:
class IncomingRpcMessageImpl final: public IncomingRpcMessage, public kj::Refcounted {
public:
IncomingRpcMessageImpl(uint firstSegmentWordSize)
: message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS
: firstSegmentWordSize) {}
IncomingRpcMessageImpl(kj::Array<word> data)
: data(kj::mv(data)),
message(this->data) {}
AnyPointer::Reader getBody() override {
return message.getRoot<AnyPointer>().asReader();
return message.getRoot<AnyPointer>();
}
MallocMessageBuilder message;
void initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>>&& capTable) override {
message.initCapTable(kj::mv(capTable));
}
kj::Array<word> data;
FlatArrayMessageReader message;
};
class OutgoingRpcMessageImpl final: public OutgoingRpcMessage {
public:
OutgoingRpcMessageImpl(ConnectionImpl& connection, uint firstSegmentWordSize)
: connection(connection),
message(kj::refcounted<IncomingRpcMessageImpl>(firstSegmentWordSize)) {}
message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS
: firstSegmentWordSize) {}
AnyPointer::Builder getBody() override {
return message->message.getRoot<AnyPointer>();
return message.getRoot<AnyPointer>();
}
kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> getCapTable() override {
return message.getCapTable();
}
void send() override {
if (connection.networkException != nullptr) {
return;
......@@ -275,11 +271,13 @@ public:
// Uncomment to get a debug dump.
// kj::String msg = connection.network.network.dumper.dump(
// message->message.getRoot<rpc::Message>(), connection.sender);
// message.getRoot<rpc::Message>(), connection.sender);
// KJ_ DBG(msg);
auto incomingMessage = kj::heap<IncomingRpcMessageImpl>(messageToFlatArray(message));
auto connectionPtr = &connection;
connection.tasks->add(kj::evalLater(kj::mvCapture(kj::addRef(*message),
connection.tasks->add(kj::evalLater(kj::mvCapture(incomingMessage,
[connectionPtr](kj::Own<IncomingRpcMessageImpl>&& message) {
KJ_IF_MAYBE(p, connectionPtr->partner) {
if (p->fulfillers.empty()) {
......@@ -296,7 +294,7 @@ public:
private:
ConnectionImpl& connection;
kj::Own<IncomingRpcMessageImpl> message;
MallocMessageBuilder message;
};
kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override {
......
......@@ -22,7 +22,6 @@
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "rpc-twoparty.h"
#include "capability-context.h"
#include "test-util.h"
#include <kj/async-unix.h>
#include <kj/debug.h>
......
......@@ -76,6 +76,10 @@ public:
return message.getRoot<AnyPointer>();
}
kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> getCapTable() override {
return message.getCapTable();
}
void send() override {
network.previousWrite = network.previousWrite.then([&]() {
auto promise = writeMessage(network.stream, message).then([]() {
......@@ -101,6 +105,10 @@ public:
return message->getRoot<AnyPointer>();
}
void initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>>&& capTable) override {
message->initCapTable(kj::mv(capTable));
}
private:
kj::Own<MessageReader> message;
};
......
......@@ -22,7 +22,7 @@
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "rpc.h"
#include "capability-context.h"
#include "message.h"
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
......@@ -910,13 +910,17 @@ private:
}
}
kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Own<ClientHook>> capTable,
kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
rpc::Payload::Builder payload) {
auto capTableBuilder = payload.initCapTable(capTable.size());
kj::Vector<ExportId> exports(capTable.size());
for (uint i: kj::indices(capTable)) {
KJ_IF_MAYBE(exportId, writeDescriptor(*capTable[i], capTableBuilder[i])) {
exports.add(*exportId);
KJ_IF_MAYBE(cap, capTable[i]) {
KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
exports.add(*exportId);
}
} else {
capTableBuilder[i].setNone();
}
}
return exports.releaseAsArray();
......@@ -1070,10 +1074,10 @@ private:
}
}
kj::Own<ClientHook> receiveCap(rpc::CapDescriptor::Reader descriptor) {
kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
switch (descriptor.which()) {
case rpc::CapDescriptor::NONE:
return newBrokenCap("Called a `CapDescriptor.none`.");
return nullptr;
case rpc::CapDescriptor::SENDER_HOSTED:
return import(descriptor.getSenderHosted(), false);
......@@ -1115,8 +1119,8 @@ private:
}
}
kj::Array<kj::Own<ClientHook>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
auto result = kj::heapArrayBuilder<kj::Own<ClientHook>>(capTable.size());
kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
for (auto cap: capTable) {
result.add(receiveCap(cap));
}
......@@ -1194,7 +1198,7 @@ private:
firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
paramsBuilder(context.imbue(callBuilder.getParams().getContent())) {}
paramsBuilder(callBuilder.getParams().getContent()) {}
inline AnyPointer::Builder getRoot() {
return paramsBuilder;
......@@ -1288,7 +1292,6 @@ private:
kj::Own<RpcClient> target;
kj::Own<OutgoingRpcMessage> message;
CapBuilderContext context;
rpc::Call::Builder callBuilder;
AnyPointer::Builder paramsBuilder;
......@@ -1300,7 +1303,7 @@ private:
SendInternalResult sendInternal(bool isTailCall) {
// Build the cap table.
auto exports = connectionState->writeDescriptors(
context.getCapTable(), callBuilder.getParams());
message->getCapTable(), callBuilder.getParams());
// Init the question table. Do this after writing descriptors to avoid interference.
QuestionId questionId;
......@@ -1432,12 +1435,10 @@ private:
RpcResponseImpl(RpcConnectionState& connectionState,
kj::Own<QuestionRef>&& questionRef,
kj::Own<IncomingRpcMessage>&& message,
AnyPointer::Reader results,
kj::Array<kj::Own<ClientHook>>&& capTable)
AnyPointer::Reader results)
: connectionState(kj::addRef(connectionState)),
message(kj::mv(message)),
context(kj::mv(capTable)),
reader(context.imbue(results)),
reader(results),
questionRef(kj::mv(questionRef)) {}
AnyPointer::Reader getResults() override {
......@@ -1451,7 +1452,6 @@ private:
private:
kj::Own<RpcConnectionState> connectionState;
kj::Own<IncomingRpcMessage> message;
CapReaderContext context;
AnyPointer::Reader reader;
kj::Own<QuestionRef> questionRef;
};
......@@ -1471,11 +1471,10 @@ private:
rpc::Payload::Builder payload)
: connectionState(connectionState),
message(kj::mv(message)),
payload(payload),
builder(context.imbue(payload.getContent())) {}
payload(payload) {}
AnyPointer::Builder getResultsBuilder() override {
return builder;
return payload.getContent();
}
kj::Maybe<kj::Array<ExportId>> send() {
......@@ -1483,7 +1482,7 @@ private:
// (Could return a non-null empty array if there were caps but none of them were exports.)
// Build the cap table.
auto capTable = context.getCapTable();
auto capTable = message->getCapTable();
auto exports = connectionState.writeDescriptors(capTable, payload);
message->send();
......@@ -1497,23 +1496,22 @@ private:
private:
RpcConnectionState& connectionState;
kj::Own<OutgoingRpcMessage> message;
CapBuilderContext context;
rpc::Payload::Builder payload;
AnyPointer::Builder builder;
};
class LocallyRedirectedRpcResponse final
: public RpcResponse, public RpcServerResponse, public kj::Refcounted{
public:
LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
: message(sizeHint) {}
: message(sizeHint.map([](MessageSize size) { return size.wordCount; })
.orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
AnyPointer::Builder getResultsBuilder() override {
return message.getRoot();
return message.getRoot<AnyPointer>();
}
AnyPointer::Reader getResults() override {
return message.getRootReader();
return message.getRoot<AnyPointer>();
}
kj::Own<RpcResponse> addRef() override {
......@@ -1521,20 +1519,18 @@ private:
}
private:
LocalMessage message;
MallocMessageBuilder message;
};
class RpcCallContext final: public CallContextHook, public kj::Refcounted {
public:
RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
kj::Own<IncomingRpcMessage>&& request, const AnyPointer::Reader& params,
kj::Array<kj::Own<ClientHook>>&& requestCapTable, bool redirectResults,
kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
: connectionState(kj::addRef(connectionState)),
answerId(answerId),
request(kj::mv(request)),
requestCapContext(kj::mv(requestCapTable)),
params(requestCapContext.imbue(params)),
params(params),
returnMessage(nullptr),
redirectResults(redirectResults),
cancelFulfiller(kj::mv(cancelFulfiller)) {}
......@@ -1740,7 +1736,6 @@ private:
// Request ---------------------------------------------
kj::Maybe<kj::Own<IncomingRpcMessage>> request;
CapReaderContext requestCapContext;
AnyPointer::Reader params;
// Response --------------------------------------------
......@@ -1938,14 +1933,14 @@ private:
}
auto payload = call.getParams();
auto capTable = receiveCaps(payload.getCapTable());
message->initCapTable(receiveCaps(payload.getCapTable()));
auto cancelPaf = kj::newPromiseAndFulfiller<void>();
AnswerId answerId = call.getQuestionId();
auto context = kj::refcounted<RpcCallContext>(
*this, answerId, kj::mv(message), payload.getContent(),
kj::mv(capTable), redirectResults, kj::mv(cancelPaf.fulfiller));
redirectResults, kj::mv(cancelPaf.fulfiller));
// No more using `call` after this point, as it now belongs to the context.
......@@ -2080,9 +2075,9 @@ private:
}
auto payload = ret.getResults();
message->initCapTable(receiveCaps(payload.getCapTable()));
questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
*this, kj::addRef(*questionRef), kj::mv(message), payload.getContent(),
receiveCaps(payload.getCapTable())));
*this, kj::addRef(*questionRef), kj::mv(message), payload.getContent()));
break;
}
......@@ -2184,7 +2179,11 @@ private:
// Extract the replacement capability.
switch (resolve.which()) {
case rpc::Resolve::CAP:
replacement = receiveCap(resolve.getCap());
KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
replacement = kj::mv(*cap);
} else {
KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
}
break;
case rpc::Resolve::EXCEPTION:
......@@ -2347,8 +2346,6 @@ private:
rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
ret.setAnswerId(answerId);
CapBuilderContext context;
kj::Own<ClientHook> capHook;
kj::Array<ExportId> resultExports;
KJ_DEFER(releaseExports(resultExports)); // in case something goes wrong
......@@ -2358,13 +2355,12 @@ private:
KJ_IF_MAYBE(r, restorer) {
Capability::Client cap = r->baseRestore(restore.getObjectId());
auto payload = ret.initResults();
auto results = context.imbue(payload.getContent());
results.setAs<Capability>(cap);
payload.getContent().setAs<Capability>(kj::mv(cap));
auto capTable = context.getCapTable();
auto capTable = response->getCapTable();
KJ_DASSERT(capTable.size() == 1);
resultExports = writeDescriptors(capTable, payload);
capHook = capTable[0]->addRef();
capHook = KJ_ASSERT_NONNULL(capTable[0])->addRef();
} else {
KJ_FAIL_REQUIRE("This vat cannot restore this SturdyRef.") { break; }
}
......
......@@ -126,6 +126,9 @@ public:
// Get the message body, which the caller may fill in any way it wants. (The standard RPC
// implementation initializes it as a Message as defined in rpc.capnp.)
virtual kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> getCapTable() = 0;
// Calls getCapTable() on the underlying MessageBuilder.
virtual void send() = 0;
// Send the message, or at least put it in a queue to be sent later. Note that the builder
// returned by `getBody()` remains valid at least until the `OutgoingRpcMessage` is destroyed.
......@@ -138,6 +141,9 @@ public:
virtual AnyPointer::Reader getBody() = 0;
// Get the message body, to be interpreted by the caller. (The standard RPC implementation
// interprets it as a Message as defined in rpc.capnp.)
virtual void initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>>&& capTable) = 0;
// Calls initCapTable() on the underlying MessageReader.
};
template <typename SturdyRefHostId, typename ProvisionId, typename RecipientId,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment