Commit 398dac0d authored by Kenton Varda's avatar Kenton Varda

Capabilities core API WIP.

parent c0fc7ba9
......@@ -33,6 +33,7 @@ namespace capnp {
namespace _ { // private
Arena::~Arena() noexcept(false) {}
BuilderArena::~BuilderArena() noexcept(false) {}
void ReadLimiter::unread(WordCount64 amount) {
// Be careful not to overflow here. Since ReadLimiter has no thread-safety, it's possible that
......@@ -47,14 +48,14 @@ void ReadLimiter::unread(WordCount64 amount) {
// =======================================================================================
ReaderArena::ReaderArena(MessageReader* message)
BasicReaderArena::BasicReaderArena(MessageReader* message)
: message(message),
readLimiter(message->getOptions().traversalLimitInWords * WORDS),
segment0(this, SegmentId(0), message->getSegment(0), &readLimiter) {}
ReaderArena::~ReaderArena() noexcept(false) {}
BasicReaderArena::~BasicReaderArena() noexcept(false) {}
SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
SegmentReader* BasicReaderArena::tryGetSegment(SegmentId id) {
if (id == SegmentId(0)) {
if (segment0.getArray() == nullptr) {
return nullptr;
......@@ -69,7 +70,7 @@ SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
KJ_IF_MAYBE(s, *lock) {
auto iter = s->find(id.value);
if (iter != s->end()) {
return iter->second.get();
return iter->second;
}
segments = s;
}
......@@ -83,7 +84,7 @@ SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
// OK, the segment exists, so allocate the map.
auto s = kj::heap<SegmentMap>();
segments = s;
*lock = mv(s);
*lock = kj::mv(s);
}
auto segment = kj::heap<SegmentReader>(this, id, newSegment, &readLimiter);
......@@ -92,7 +93,7 @@ SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
return result;
}
void ReaderArena::reportReadLimitReached() {
void BasicReaderArena::reportReadLimitReached() {
KJ_FAIL_REQUIRE("Exceeded message traversal limit. See capnp::ReaderOptions.") {
return;
}
......@@ -100,11 +101,61 @@ void ReaderArena::reportReadLimitReached() {
// =======================================================================================
BuilderArena::BuilderArena(MessageBuilder* message)
ImbuedReaderArena::ImbuedReaderArena(Arena* base)
: base(base), segment0(nullptr, SegmentId(0), nullptr, nullptr) {}
ImbuedReaderArena::~ImbuedReaderArena() noexcept(false) {}
SegmentReader* ImbuedReaderArena::imbue(SegmentReader* baseSegment) {
if (baseSegment == nullptr) return nullptr;
if (baseSegment->getSegmentId() == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
kj::dtor(segment0);
kj::ctor(segment0, this, *baseSegment);
}
KJ_DASSERT(segment0.getArray().begin() == baseSegment->getArray().begin());
return &segment0;
}
auto lock = moreSegments.lockExclusive();
SegmentMap* segments = nullptr;
KJ_IF_MAYBE(s, *lock) {
auto iter = s->find(baseSegment);
if (iter != s->end()) {
KJ_DASSERT(iter->second->getArray().begin() == baseSegment->getArray().begin());
return iter->second;
}
segments = s;
} else {
auto newMap = kj::heap<SegmentMap>();
segments = newMap;
*lock = kj::mv(newMap);
}
auto newSegment = kj::heap<SegmentReader>(this, *baseSegment);
SegmentReader* result = newSegment;
segments->insert(std::make_pair(baseSegment, mv(newSegment)));
return result;
}
// implements Arena ------------------------------------------------
SegmentReader* ImbuedReaderArena::tryGetSegment(SegmentId id) {
return imbue(base->tryGetSegment(id));
}
void ImbuedReaderArena::reportReadLimitReached() {
return base->reportReadLimitReached();
}
// =======================================================================================
BasicBuilderArena::BasicBuilderArena(MessageBuilder* message)
: message(message), segment0(nullptr, SegmentId(0), nullptr, nullptr) {}
BuilderArena::~BuilderArena() noexcept(false) {}
BasicBuilderArena::~BasicBuilderArena() noexcept(false) {}
SegmentBuilder* BuilderArena::getSegment(SegmentId id) {
SegmentBuilder* BasicBuilderArena::getSegment(SegmentId id) {
// This method is allowed to fail if the segment ID is not valid.
if (id == SegmentId(0)) {
return &segment0;
......@@ -114,14 +165,14 @@ SegmentBuilder* BuilderArena::getSegment(SegmentId id) {
KJ_REQUIRE(id.value - 1 < s->builders.size(), "invalid segment id", id.value);
// TODO(cleanup): Return a const SegmentBuilder and tediously constify all SegmentBuilder
// pointers throughout the codebase.
return const_cast<SegmentBuilder*>(s->builders[id.value - 1].get());
return const_cast<BasicSegmentBuilder*>(s->builders[id.value - 1].get());
} else {
KJ_FAIL_REQUIRE("invalid segment id", id.value);
}
}
}
BuilderArena::AllocateResult BuilderArena::allocate(WordCount amount) {
BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount) {
if (segment0.getArena() == nullptr) {
// We're allocating the first segment. We don't need to worry about threads at this point
// because calling MessageBuilder::initRoot() from multiple threads is not intended to be safe.
......@@ -162,7 +213,7 @@ BuilderArena::AllocateResult BuilderArena::allocate(WordCount amount) {
*lock = kj::mv(newSegmentState);
}
kj::Own<SegmentBuilder> newBuilder = kj::heap<SegmentBuilder>(
kj::Own<BasicSegmentBuilder> newBuilder = kj::heap<BasicSegmentBuilder>(
this, SegmentId(segmentState->builders.size() + 1),
message->allocateSegment(amount / WORDS), &this->dummyLimiter);
SegmentBuilder* result = newBuilder.get();
......@@ -178,7 +229,7 @@ BuilderArena::AllocateResult BuilderArena::allocate(WordCount amount) {
}
}
kj::ArrayPtr<const kj::ArrayPtr<const word>> BuilderArena::getSegmentsForOutput() {
kj::ArrayPtr<const kj::ArrayPtr<const word>> BasicBuilderArena::getSegmentsForOutput() {
// We shouldn't need to lock a mutex here because if this is called multiple times simultaneously,
// we should only be overwriting the array with the exact same data. If the number or size of
// segments is actually changing due to an activity in another thread, then the caller has a
......@@ -209,7 +260,7 @@ kj::ArrayPtr<const kj::ArrayPtr<const word>> BuilderArena::getSegmentsForOutput(
}
}
SegmentReader* BuilderArena::tryGetSegment(SegmentId id) {
SegmentReader* BasicBuilderArena::tryGetSegment(SegmentId id) {
if (id == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
// We haven't allocated any segments yet.
......@@ -231,12 +282,67 @@ SegmentReader* BuilderArena::tryGetSegment(SegmentId id) {
}
}
void BuilderArena::reportReadLimitReached() {
KJ_FAIL_ASSERT(
"Read limit reached for BuilderArena, but it should have been unlimited.") {
void BasicBuilderArena::reportReadLimitReached() {
KJ_FAIL_ASSERT("Read limit reached for BuilderArena, but it should have been unlimited.") {
return;
}
}
// =======================================================================================
ImbuedBuilderArena::ImbuedBuilderArena(BuilderArena* base)
: base(base), segment0(nullptr) {}
ImbuedBuilderArena::~ImbuedBuilderArena() noexcept(false) {}
SegmentBuilder* ImbuedBuilderArena::imbue(SegmentBuilder* baseSegment) {
if (baseSegment == nullptr) return nullptr;
SegmentBuilder* result;
if (baseSegment->getSegmentId() == SegmentId(0)) {
if (segment0.getArena() == nullptr) {
kj::dtor(segment0);
kj::ctor(segment0, baseSegment);
}
result = &segment0;
} else {
auto lock = moreSegments.lockExclusive();
KJ_IF_MAYBE(segmentState, *lock) {
auto id = baseSegment->getSegmentId().value;
if (id >= segmentState->builders.size()) {
segmentState->builders.resize(id + 1);
}
KJ_IF_MAYBE(segment, segmentState->builders[id]) {
result = segment;
} else {
auto newBuilder = kj::heap<ImbuedSegmentBuilder>(baseSegment);
result = newBuilder;
segmentState->builders[id] = kj::mv(newBuilder);
}
}
return nullptr;
}
KJ_DASSERT(result->getArray().begin() == baseSegment->getArray().begin());
return result;
}
SegmentReader* ImbuedBuilderArena::tryGetSegment(SegmentId id) {
return imbue(static_cast<SegmentBuilder*>(base->tryGetSegment(id)));
}
void ImbuedBuilderArena::reportReadLimitReached() {
base->reportReadLimitReached();
}
SegmentBuilder* ImbuedBuilderArena::getSegment(SegmentId id) {
return imbue(base->getSegment(id));
}
BuilderArena::AllocateResult ImbuedBuilderArena::allocate(WordCount amount) {
auto result = allocate(amount);
result.segment = imbue(result.segment);
return result;
}
} // namespace _ (private)
} // namespace capnp
......@@ -36,13 +36,19 @@
#include "message.h"
namespace capnp {
class TypelessCapability;
namespace _ { // private
class SegmentReader;
class SegmentBuilder;
class Arena;
class ReaderArena;
class BasicReaderArena;
class ImbuedReaderArena;
class BuilderArena;
class BasicBuilderArena;
class ImbuedBuilderArena;
class ReadLimiter;
class Segment;
......@@ -92,6 +98,7 @@ class SegmentReader {
public:
inline SegmentReader(Arena* arena, SegmentId id, kj::ArrayPtr<const word> ptr,
ReadLimiter* readLimiter);
inline SegmentReader(Arena* arena, const SegmentReader& base);
KJ_ALWAYS_INLINE(bool containsInterval(const void* from, const void* to));
......@@ -116,12 +123,13 @@ private:
KJ_DISALLOW_COPY(SegmentReader);
friend class SegmentBuilder;
friend class ImbuedSegmentBuilder;
};
class SegmentBuilder: public SegmentReader {
public:
inline SegmentBuilder(BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr,
ReadLimiter* readLimiter);
ReadLimiter* readLimiter, word** pos);
KJ_ALWAYS_INLINE(word* allocate(WordCount amount));
inline word* getPtrUnchecked(WordCount offset);
......@@ -133,11 +141,35 @@ public:
inline void reset();
private:
word* pos;
word** pos;
// Pointer to a pointer to the current end point of the segment, i.e. the location where the
// next object should be allocated. The extra level of indirection allows an
// ImbuedSegmentBuilder to share this pointer with the underlying BasicSegmentBuilder.
friend class ImbuedSegmentBuilder;
KJ_DISALLOW_COPY(SegmentBuilder);
};
class BasicSegmentBuilder: public SegmentBuilder {
public:
inline BasicSegmentBuilder(BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr,
ReadLimiter* readLimiter);
private:
word* actualPos;
KJ_DISALLOW_COPY(BasicSegmentBuilder);
};
class ImbuedSegmentBuilder: public SegmentBuilder {
public:
inline ImbuedSegmentBuilder(SegmentBuilder* base);
inline ImbuedSegmentBuilder(decltype(nullptr));
KJ_DISALLOW_COPY(ImbuedSegmentBuilder);
};
class Arena {
public:
virtual ~Arena() noexcept(false);
......@@ -150,14 +182,16 @@ public:
// the VALIDATE_INPUT() macro which may throw an exception; if it return normally, the caller
// will need to continue with default values.
// TODO(someday): Methods to deal with bundled capabilities.
virtual kj::Own<TypelessCapability> extractCap(const _::StructReader& capDescriptor);
// Given a StructReader for a capability descriptor embedded in the message, return the
// corresponding capability.
};
class ReaderArena final: public Arena {
class BasicReaderArena final: public Arena {
public:
ReaderArena(MessageReader* message);
~ReaderArena() noexcept(false);
KJ_DISALLOW_COPY(ReaderArena);
BasicReaderArena(MessageReader* message);
~BasicReaderArena() noexcept(false);
KJ_DISALLOW_COPY(BasicReaderArena);
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
......@@ -174,15 +208,33 @@ private:
kj::MutexGuarded<kj::Maybe<kj::Own<SegmentMap>>> moreSegments;
};
class BuilderArena final: public Arena {
class ImbuedReaderArena final: public Arena {
public:
BuilderArena(MessageBuilder* message);
~BuilderArena() noexcept(false);
KJ_DISALLOW_COPY(BuilderArena);
ImbuedReaderArena(Arena* base);
~ImbuedReaderArena() noexcept(false);
KJ_DISALLOW_COPY(ImbuedReaderArena);
inline SegmentBuilder* getRootSegment() { return &segment0; }
SegmentReader* imbue(SegmentReader* base);
SegmentBuilder* getSegment(SegmentId id);
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
private:
Arena* base;
// Optimize for single-segment messages so that small messages are handled quickly.
SegmentReader segment0;
typedef std::unordered_map<SegmentReader*, kj::Own<SegmentReader>> SegmentMap;
kj::MutexGuarded<kj::Maybe<kj::Own<SegmentMap>>> moreSegments;
};
class BuilderArena: public Arena {
public:
virtual ~BuilderArena() noexcept(false);
virtual SegmentBuilder* getSegment(SegmentId id) = 0;
// Get the segment with the given id. Crashes or throws an exception if no such segment exists.
struct AllocateResult {
......@@ -190,37 +242,84 @@ public:
word* words;
};
AllocateResult allocate(WordCount amount);
virtual AllocateResult allocate(WordCount amount) = 0;
// Find a segment with at least the given amount of space available and allocate the space.
// Note that allocating directly from a particular segment is much faster, but allocating from
// the arena is guaranteed to succeed. Therefore callers should try to allocate from a specific
// segment first if there is one, then fall back to the arena.
virtual void injectCap(_::PointerBuilder pointer, kj::Own<TypelessCapability>&& cap) = 0;
// Add the capability to the message and initialize the given pointer as an interface pointer
// pointing to this cap.
};
class BasicBuilderArena final: public BuilderArena {
// A BuilderArena that does not allow the injection of capabilities.
public:
BasicBuilderArena(MessageBuilder* message);
~BasicBuilderArena() noexcept(false);
KJ_DISALLOW_COPY(BasicBuilderArena);
inline SegmentBuilder* getRootSegment() { return &segment0; }
kj::ArrayPtr<const kj::ArrayPtr<const word>> getSegmentsForOutput();
// Get an array of all the segments, suitable for writing out. This only returns the allocated
// portion of each segment, whereas tryGetSegment() returns something that includes
// not-yet-allocated space.
// TODO(someday): Methods to deal with bundled capabilities.
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
// implements BuilderArena -----------------------------------------
SegmentBuilder* getSegment(SegmentId id) override;
AllocateResult allocate(WordCount amount) override;
private:
MessageBuilder* message;
ReadLimiter dummyLimiter;
SegmentBuilder segment0;
BasicSegmentBuilder segment0;
kj::ArrayPtr<const word> segment0ForOutput;
struct MultiSegmentState {
std::vector<kj::Own<SegmentBuilder>> builders;
std::vector<kj::Own<BasicSegmentBuilder>> builders;
std::vector<kj::ArrayPtr<const word>> forOutput;
};
kj::MutexGuarded<kj::Maybe<kj::Own<MultiSegmentState>>> moreSegments;
};
class ImbuedBuilderArena final: public BuilderArena {
// A BuilderArena imbued with the ability to inject capabilities.
public:
ImbuedBuilderArena(BuilderArena* base);
~ImbuedBuilderArena() noexcept(false);
KJ_DISALLOW_COPY(ImbuedBuilderArena);
SegmentBuilder* imbue(SegmentBuilder* baseSegment);
// Return an imbued SegmentBuilder corresponding to the given segment from the base arena.
// implements Arena ------------------------------------------------
SegmentReader* tryGetSegment(SegmentId id) override;
void reportReadLimitReached() override;
// implements BuilderArena -----------------------------------------
SegmentBuilder* getSegment(SegmentId id) override;
AllocateResult allocate(WordCount amount) override;
private:
BuilderArena* base;
ImbuedSegmentBuilder segment0;
struct MultiSegmentState {
std::vector<kj::Maybe<kj::Own<ImbuedSegmentBuilder>>> builders;
};
kj::MutexGuarded<kj::Maybe<kj::Own<MultiSegmentState>>> moreSegments;
};
// =======================================================================================
inline ReadLimiter::ReadLimiter()
......@@ -250,6 +349,9 @@ inline SegmentReader::SegmentReader(Arena* arena, SegmentId id, kj::ArrayPtr<con
ReadLimiter* readLimiter)
: arena(arena), id(id), ptr(ptr), readLimiter(readLimiter) {}
inline SegmentReader::SegmentReader(Arena* arena, const SegmentReader& base)
: arena(arena), id(base.id), ptr(base.ptr), readLimiter(base.readLimiter) {}
inline bool SegmentReader::containsInterval(const void* from, const void* to) {
return from >= this->ptr.begin() && to <= this->ptr.end() &&
readLimiter->canRead(
......@@ -271,12 +373,11 @@ inline void SegmentReader::unread(WordCount64 amount) { readLimiter->unread(amou
// -------------------------------------------------------------------
inline SegmentBuilder::SegmentBuilder(
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter)
: SegmentReader(arena, id, ptr, readLimiter),
pos(ptr.begin()) {}
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter, word** pos)
: SegmentReader(arena, id, ptr, readLimiter), pos(pos) {}
inline word* SegmentBuilder::allocate(WordCount amount) {
word* result = __atomic_fetch_add(&pos, amount * BYTES_PER_WORD / BYTES, __ATOMIC_RELAXED);
word* result = __atomic_fetch_add(pos, amount * BYTES_PER_WORD / BYTES, __ATOMIC_RELAXED);
// Careful about pointer arithmetic here. The segment might be at the end of the address space,
// or `amount` could be ridiculously huge.
......@@ -287,7 +388,7 @@ inline word* SegmentBuilder::allocate(WordCount amount) {
// other thread could have accidentally allocated space in this segment in the meantime.
// We need to back up the pointer so that it will be correct when the data is written out
// (and also so that another allocation can potentially use the remaining space).
__atomic_store_n(&pos, result, __ATOMIC_RELAXED);
__atomic_store_n(pos, result, __ATOMIC_RELAXED);
}
return nullptr;
} else {
......@@ -309,15 +410,27 @@ inline BuilderArena* SegmentBuilder::getArena() {
}
inline kj::ArrayPtr<const word> SegmentBuilder::currentlyAllocated() {
return kj::arrayPtr(ptr.begin(), pos - ptr.begin());
return kj::arrayPtr(ptr.begin(), *pos - ptr.begin());
}
inline void SegmentBuilder::reset() {
word* start = getPtrUnchecked(0 * WORDS);
memset(start, 0, (pos - start) * sizeof(word));
pos = start;
memset(start, 0, (*pos - start) * sizeof(word));
*pos = start;
}
inline BasicSegmentBuilder::BasicSegmentBuilder(
BuilderArena* arena, SegmentId id, kj::ArrayPtr<word> ptr, ReadLimiter* readLimiter)
: SegmentBuilder(arena, id, ptr, readLimiter, &actualPos),
actualPos(ptr.begin()) {}
inline ImbuedSegmentBuilder::ImbuedSegmentBuilder(SegmentBuilder* base)
: SegmentBuilder(static_cast<BuilderArena*>(base->arena), base->id,
kj::arrayPtr(const_cast<word*>(base->ptr.begin()), base->ptr.size()),
base->readLimiter, base->pos) {}
inline ImbuedSegmentBuilder::ImbuedSegmentBuilder(decltype(nullptr))
: SegmentBuilder(nullptr, SegmentId(0), nullptr, nullptr, nullptr) {}
} // namespace _ (private)
} // namespace capnp
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "capability.h"
namespace capnp {
namespace _ { // private
CallResult::Pipeline CallResult::Pipeline::getPointerField(uint16_t pointerIndex) const {
auto newOps = kj::heapArray<PipelineManager::Op>(ops.size() + 1);
for (auto i: kj::indices(ops)) {
newOps[i] = ops[i];
}
auto& newOp = newOps[ops.size()];
newOp.type = PipelineManager::Op::GET_POINTER_FIELD;
newOp.pointerIndex = pointerIndex;
return Pipeline(call->addRef(), kj::mv(newOps));
}
} // namespace _ (private)
} // namespace capnp
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNP_CAPABILITY_H_
#define CAPNP_CAPABILITY_H_
#include <kj/async.h>
#include "object.h"
namespace capnp {
template <typename T>
class RemotePromise: public kj::Promise<kj::Own<ReaderFor<T>>>, public T::Pipeline {
// A Promise which supports pipelined calls. T is typically a struct type.
public:
inline RemotePromise(kj::Promise<kj::Own<ReaderFor<T>>>&& promise,
typename T::Pipeline&& pipeline)
: kj::Promise<kj::Own<ReaderFor<T>>>(kj::mv(promise)),
T::Pipeline(kj::mv(pipeline)) {}
inline RemotePromise(decltype(nullptr))
: kj::Promise<kj::Own<ReaderFor<T>>>(nullptr) {}
KJ_DISALLOW_COPY(RemotePromise);
RemotePromise(RemotePromise&& other) = default;
RemotePromise& operator=(RemotePromise&& other) = default;
};
// =======================================================================================
class Call;
struct CallResult;
class TypelessCapability {
// This is an internal type used to represent a live capability (or a promise for a capability).
// This class should not be used directly by applications; it is intended to be used by the
// generated code wrappers.
public:
virtual kj::Own<Call> newCall(uint64_t interfaceId, uint16_t methodId) const = 0;
// Begin a new call to a method of this capability.
virtual kj::Own<TypelessCapability> addRef() const = 0;
// Return a new reference-counted pointer to the same capability. (Reference counting can be
// implemented using a special Disposer, so that the returned pointer actually has the same
// identity as the original.)
virtual kj::Promise<void> whenResolved() const = 0;
// If the capability is actually only a promise, the returned promise resolves once the
// capability itself has resolved to its final destination (or propagates the exception if
// the capability promise is rejected). This is mainly useful for error-checking in the case
// where no calls are being made. There is no reason to wait for this before making calls; if
// the capability does not resolve, the call results will propagate the error.
// TODO(soon): method implementing Join
};
// =======================================================================================
class Call {
public:
virtual ObjectPointer::Builder getRequest() = 0;
// Get the request object for this call, to be filled in before sending.
virtual RemotePromise<CallResult> send() = 0;
// Send the call and return a promise for the result.
};
class CallRunner {
// Implements pipelined requests for a particular outstanding call.
public:
virtual kj::Own<CallRunner> addRef() const = 0;
// Increment this object's reference count.
struct PipelineOp {
enum Type {
GET_POINTER_FIELD
};
Type type;
union {
uint16_t pointerIndex;
};
};
virtual kj::Own<TypelessCapability> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) const = 0;
// Extract a promised Capability from the answer.
};
struct CallResult {
// Result of a call. Designed to be used as RemotePromise<CallResult>.
typedef ObjectPointer::Reader Reader;
// So RemotePromise<CallResult> resolves to Own<ObjectPointer::Reader>.
class Pipeline {
public:
inline explicit Pipeline(kj::Own<CallRunner>&& runner): runner(kj::mv(runner)) {}
Pipeline getPointerField(uint16_t pointerIndex) const;
// Return a new Promise representing a sub-object of the result. `pointerIndex` is the index
// of the sub-object within the pointer section of the result (the result must be a struct).
//
// TODO(kenton): On GCC 4.8 / Clang 3.3, use rvalue qualifiers to avoid the need for copies.
// Make `ops` into a Vector so that it can be extended without allocation in this case.
inline kj::Own<TypelessCapability> asCap() const { return runner->getPipelinedCap(ops); }
// Expect that the result is a capability and construct a pipelined version of it now.
private:
kj::Own<CallRunner> runner;
kj::Array<CallRunner::PipelineOp> ops;
inline Pipeline(kj::Own<CallRunner>&& runner, kj::Array<CallRunner::PipelineOp>&& ops)
: runner(kj::mv(runner)), ops(kj::mv(ops)) {}
};
};
// =======================================================================================
// Classes for imbuing message readers/builders with a capability context.
//
// These classes are for use by RPC implementations. Application code need not know about them.
//
// TODO(kenton): Move these to a separate header.
//
// Normally, MessageReader and MessageBuilder do not support interface pointers because they
// are not RPC-aware and so have no idea how to convert between a serialized CapabilityDescriptor
// and a live capability. To fix this, a reader/builder object needs to be "imbued" with a
// capability context. This creates a new reader/builder which points at the same object but has
// the ability to deal with interface fields.
namespace _ { // private
class ImbuedReaderArena;
class ImbuedBuilderArena;
} // namespace _ (private)
class CapExtractorBase {
// Non-template base class for CapExtractor<T>.
private:
virtual kj::Own<TypelessCapability> extractCapInternal(const _::StructReader& capDescriptor) = 0;
friend class _::ImbuedReaderArena;
};
class CapInjectorBase {
// Non-template base class for CapInjector<T>.
private:
virtual void injectCapInternal(_::PointerBuilder builder, kj::Own<TypelessCapability>&& cap) = 0;
friend class _::ImbuedBuilderArena;
};
template <typename CapDescriptor>
class CapExtractor: public CapExtractorBase {
// Callback used to read a capability from a message, implemented by the RPC system.
// `CapDescriptor` is the struct type which the RPC implementation uses to represent
// capabilities. (On the wire, an interface pointer actually points to a struct of this type.)
public:
virtual kj::Own<TypelessCapability> extractCap(typename CapDescriptor::Reader descriptor) = 0;
// Given the descriptor read off the wire, construct a live capability.
private:
kj::Own<TypelessCapability> extractCapInternal(
const _::StructReader& capDescriptor) override final {
return extractCap(typename CapDescriptor::Reader(capDescriptor));
}
};
template <typename CapDescriptor>
class CapInjector: public CapInjectorBase {
// Callback used to write a capability into a message, implemented by the RPC system.
// `CapDescriptor` is the struct type which the RPC implementation uses to represent
// capabilities. (On the wire, an interface pointer actually points to a struct of this type.)
public:
virtual void injectCap(typename CapDescriptor::Builder descriptor,
kj::Own<TypelessCapability>&& cap) = 0;
// Fill in the given descriptor so that it describes the given capability.
private:
void injectCapInternal(_::PointerBuilder builder,
kj::Own<TypelessCapability>&& cap) override final {
injectCap(
typename CapDescriptor::Builder(builder.initCapDescriptor(_::structSize<CapDescriptor>())),
kj::mv(cap));
}
};
class CapReaderContext {
// Class which can "imbue" reader objects from some other message with a capability context,
// so that interface pointers found in the message can be extracted and called.
public:
CapReaderContext(Orphanage arena, CapExtractorBase& extractor);
~CapReaderContext() noexcept(false);
ObjectPointer::Reader imbue(ObjectPointer::Reader base);
private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
virtual kj::Own<TypelessCapability> extractCapInternal(const _::StructReader& capDescriptor) = 0;
friend class _::ImbuedReaderArena;
};
class CapBuilderContext {
// Class which can "imbue" reader objects from some other message with a capability context,
// so that interface pointers found in the message can be set to point at live capabilities.
public:
CapBuilderContext(Orphanage arena, CapInjectorBase& injector);
~CapBuilderContext() noexcept(false);
ObjectPointer::Builder imbue(ObjectPointer::Builder base);
private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
virtual void injectCapInternal(_::PointerBuilder builder, kj::Own<TypelessCapability>&& cap) = 0;
friend class _::ImbuedBuilderArena;
};
} // namespace capnp
#endif // CAPNP_CAPABILITY_H_
......@@ -279,7 +279,7 @@ static void checkStruct(StructReader reader) {
TEST(WireFormat, StructRoundTrip_OneSegment) {
MallocMessageBuilder message;
BuilderArena arena(&message);
BasicBuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......@@ -316,7 +316,7 @@ TEST(WireFormat, StructRoundTrip_OneSegment) {
TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
MallocMessageBuilder message(0, AllocationStrategy::FIXED_SIZE);
BuilderArena arena(&message);
BasicBuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......@@ -354,7 +354,7 @@ TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
TEST(WireFormat, StructRoundTrip_MultipleSegmentsWithMultipleAllocations) {
MallocMessageBuilder message(8, AllocationStrategy::FIXED_SIZE);
BuilderArena arena(&message);
BasicBuilderArena arena(&message);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
......
......@@ -32,11 +32,15 @@
#define CAPNP_LAYOUT_H_
#include <kj/common.h>
#include <kj/memory.h>
#include "common.h"
#include "blob.h"
#include "endian.h"
namespace capnp {
class TypelessCapability;
namespace _ { // private
class PointerBuilder;
......@@ -280,6 +284,7 @@ public:
ListBuilder getList(FieldSize elementSize, const word* defaultValzue);
ListBuilder getStructList(StructSize elementSize, const word* defaultValue);
template <typename T> typename T::Builder getBlob(const void* defaultValue,ByteCount defaultSize);
kj::Own<TypelessCapability> getCapability();
// Get methods: Get the value. If it is null, initialize it to a copy of the default value.
// The default value is encoded as an "unchecked message" for structs, lists, and objects, or a
// simple byte array for blobs.
......@@ -288,12 +293,14 @@ public:
ListBuilder initList(FieldSize elementSize, ElementCount elementCount);
ListBuilder initStructList(ElementCount elementCount, StructSize size);
template <typename T> typename T::Builder initBlob(ByteCount size);
StructBuilder initCapDescriptor(StructSize size);
// Init methods: Initialize the pointer to a newly-allocated object, discarding the existing
// object.
void setStruct(const StructReader& value);
void setList(const ListReader& value);
template <typename T> void setBlob(typename T::Reader value);
void setCapability(kj::Own<TypelessCapability>&& cap);
// Set methods: Initialize the pointer to a newly-allocated copy of the given value, discarding
// the existing object.
......@@ -335,6 +342,7 @@ public:
ListReader getList(FieldSize expectedElementSize, const word* defaultValue) const;
template <typename T>
typename T::Reader getBlob(const void* defaultValue, ByteCount defaultSize) const;
kj::Own<TypelessCapability> getCapability();
// Get methods: Get the value. If it is null, return the default value instead.
// The default value is encoded as an "unchecked message" for structs, lists, and objects, or a
// simple byte array for blobs.
......
......@@ -38,16 +38,16 @@ namespace capnp {
MessageReader::MessageReader(ReaderOptions options): options(options), allocatedArena(false) {}
MessageReader::~MessageReader() noexcept(false) {
if (allocatedArena) {
arena()->~ReaderArena();
arena()->~BasicReaderArena();
}
}
_::StructReader MessageReader::getRootInternal() {
if (!allocatedArena) {
static_assert(sizeof(_::ReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a ReaderArena. Please increase it. This will break "
static_assert(sizeof(_::BasicReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BasicReaderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) _::ReaderArena(this);
new(arena()) _::BasicReaderArena(this);
allocatedArena = true;
}
......@@ -74,7 +74,7 @@ _::SegmentBuilder* MessageBuilder::getRootSegment() {
if (allocatedArena) {
return arena()->getSegment(_::SegmentId(0));
} else {
static_assert(sizeof(_::BuilderArena) <= sizeof(arenaSpace),
static_assert(sizeof(_::BasicBuilderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BuilderArena. Please increase it.");
kj::ctor(*arena(), this);
allocatedArena = true;
......
......@@ -33,8 +33,8 @@
namespace capnp {
namespace _ { // private
class ReaderArena;
class BuilderArena;
class BasicReaderArena;
class BasicBuilderArena;
}
class StructSchema;
......@@ -127,7 +127,7 @@ private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
bool allocatedArena;
_::ReaderArena* arena() { return reinterpret_cast<_::ReaderArena*>(arenaSpace); }
_::BasicReaderArena* arena() { return reinterpret_cast<_::BasicReaderArena*>(arenaSpace); }
_::StructReader getRootInternal();
};
......@@ -202,7 +202,7 @@ private:
// isn't constructed yet. This is kind of annoying because it means that getOrphanage() is
// not thread-safe, but that shouldn't be a huge deal...
_::BuilderArena* arena() { return reinterpret_cast<_::BuilderArena*>(arenaSpace); }
_::BasicBuilderArena* arena() { return reinterpret_cast<_::BasicBuilderArena*>(arenaSpace); }
_::SegmentBuilder* getRootSegment();
_::StructBuilder initRoot(_::StructSize size);
void setRootInternal(_::StructReader reader);
......
......@@ -602,6 +602,12 @@ struct Join {
keyPart @2 :JoinKeyPart;
# A part of the join key. These combine to form the complete join key which is used to establish
# a direct connection.
# TODO(now): Change this so that multiple parts can be sent in a single Join message, so that
# if multiple join parts are going to cross the same connection they can be sent together, so that
# the receive can potentially optimize its handling of them. In the case where all parts are
# bundled together, should the recipient be expected to simply return a cap, so that the caller
# can immediately start pipelining to it?
}
# ========================================================================================
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment