Commit 68733fb7 authored by Kenton Varda's avatar Kenton Varda

Thread-safe message builders... not performing well. Might revert.

parent 64075f16
......@@ -38,8 +38,9 @@ void ReadLimiter::unread(WordCount64 amount) {
// Be careful not to overflow here. Since ReadLimiter has no thread-safety, it's possible that
// the limit value was not updated correctly for one or more reads, and therefore unread() could
// overflow it even if it is only unreading bytes that were acutally read.
WordCount64 newValue = limit + amount;
if (newValue > limit) {
uint64_t oldValue = limit;
uint64_t newValue = oldValue + amount / WORDS;
if (newValue > oldValue) {
limit = newValue;
}
}
......@@ -109,52 +110,63 @@ SegmentBuilder* BuilderArena::getSegment(SegmentId id) {
// This method is allowed to fail if the segment ID is not valid.
if (id == SegmentId(0)) {
return &segment0;
} else KJ_IF_MAYBE(s, moreSegments) {
KJ_REQUIRE(id.value - 1 < s->builders.size(), "invalid segment id", id.value);
return s->builders[id.value - 1].get();
} else {
KJ_FAIL_REQUIRE("invalid segment id", id.value);
auto lock = moreSegments.lockShared();
KJ_IF_MAYBE(s, *lock) {
KJ_REQUIRE(id.value - 1 < s->builders.size(), "invalid segment id", id.value);
// TODO(cleanup): Return a const SegmentBuilder and tediously constify all SegmentBuilder
// pointers throughout the codebase.
return const_cast<SegmentBuilder*>(s->builders[id.value - 1].get());
} else {
KJ_FAIL_REQUIRE("invalid segment id", id.value);
}
}
}
SegmentBuilder* BuilderArena::getSegmentWithAvailable(WordCount minimumAvailable) {
// TODO(someday): Mutex-locking? Do we want to allow people to build different parts of the
// same message in different threads?
BuilderArena::AllocateResult BuilderArena::allocate(WordCount amount) {
if (segment0.getArena() == nullptr) {
// We're allocating the first segment.
kj::ArrayPtr<word> ptr = message->allocateSegment(minimumAvailable / WORDS);
// We're allocating the first segment. We don't need to worry about threads at this point
// because calling MessageBuilder::initRoot() from multiple threads is not intended to be safe.
kj::ArrayPtr<word> ptr = message->allocateSegment(amount / WORDS);
// Re-allocate segment0 in-place. This is a bit of a hack, but we have not returned any
// pointers to this segment yet, so it should be fine.
segment0.~SegmentBuilder();
return new (&segment0) SegmentBuilder(this, SegmentId(0), ptr, &this->dummyLimiter);
kj::dtor(segment0);
kj::ctor(segment0, this, SegmentId(0), ptr, &this->dummyLimiter);
return AllocateResult { &segment0, segment0.allocate(amount) };
} else {
if (segment0.available() >= minimumAvailable) {
return &segment0;
// Check if there is space in the first segment. We can do this without locking.
word* attempt = segment0.allocate(amount);
if (attempt != nullptr) {
return AllocateResult { &segment0, attempt };
}
// Need to fall back to additional segments.
auto lock = moreSegments.lockExclusive();
MultiSegmentState* segmentState;
KJ_IF_MAYBE(s, moreSegments) {
KJ_IF_MAYBE(s, *lock) {
// TODO(perf): Check for available space in more than just the last segment. We don't
// want this to be O(n), though, so we'll need to maintain some sort of table. Complicating
// matters, we want SegmentBuilders::allocate() to be fast, so we can't update any such
// table when allocation actually happens. Instead, we could have a priority queue based
// on the last-known available size, and then re-check the size when we pop segments off it
// and shove them to the back of the queue if they have become too small.
if (s->builders.back()->available() >= minimumAvailable) {
return s->builders.back().get();
attempt = s->builders.back()->allocate(amount);
if (attempt != nullptr) {
return AllocateResult { s->builders.back().get(), attempt };
}
segmentState = s;
} else {
auto newSegmentState = kj::heap<MultiSegmentState>();
segmentState = newSegmentState;
moreSegments = kj::mv(newSegmentState);
*lock = kj::mv(newSegmentState);
}
kj::Own<SegmentBuilder> newBuilder = kj::heap<SegmentBuilder>(
this, SegmentId(segmentState->builders.size() + 1),
message->allocateSegment(minimumAvailable / WORDS), &this->dummyLimiter);
message->allocateSegment(amount / WORDS), &this->dummyLimiter);
SegmentBuilder* result = newBuilder.get();
segmentState->builders.push_back(kj::mv(newBuilder));
......@@ -162,7 +174,9 @@ SegmentBuilder* BuilderArena::getSegmentWithAvailable(WordCount minimumAvailable
// getSegmentsForOutput(), which callers might reasonably expect is a thread-safe method.
segmentState->forOutput.resize(segmentState->builders.size() + 1);
return result;
// Allocating from the new segment is guaranteed to succeed since no other thread could have
// received a pointer to it yet (since we still hold the lock).
return AllocateResult { result, result->allocate(amount) };
}
}
......@@ -172,7 +186,7 @@ kj::ArrayPtr<const kj::ArrayPtr<const word>> BuilderArena::getSegmentsForOutput(
// segments is actually changing due to an activity in another thread, then the caller has a
// problem regardless of locking here.
KJ_IF_MAYBE(segmentState, moreSegments) {
KJ_IF_MAYBE(segmentState, moreSegments.getWithoutLock()) {
KJ_DASSERT(segmentState->forOutput.size() == segmentState->builders.size() + 1,
"segmentState->forOutput wasn't resized correctly when the last builder was added.",
segmentState->forOutput.size(), segmentState->builders.size());
......@@ -206,9 +220,13 @@ SegmentReader* BuilderArena::tryGetSegment(SegmentId id) {
return &segment0;
}
} else {
KJ_IF_MAYBE(segmentState, moreSegments) {
auto lock = moreSegments.lockShared();
KJ_IF_MAYBE(segmentState, *lock) {
if (id.value <= segmentState->builders.size()) {
return segmentState->builders[id.value - 1].get();
// TODO(cleanup): Return a const SegmentReader and tediously constify all SegmentBuilder
// pointers throughout the codebase.
return const_cast<SegmentReader*>(kj::implicitCast<const SegmentReader*>(
segmentState->builders[id.value - 1].get()));
}
}
return nullptr;
......
......@@ -31,6 +31,7 @@
#include <vector>
#include <unordered_map>
#include <kj/common.h>
#include <kj/mutex.h>
#include "common.h"
#include "message.h"
......@@ -61,6 +62,10 @@ class ReadLimiter {
// readers. If you call the same getter twice, the data it returns may be double-counted. This
// should not be a big deal in most cases -- just set the read limit high enough that it will
// only trigger in unreasonable cases.
//
// This class is "safe" to use from multiple threads for its intended use case. Threads may
// overwrite each others' changes to the counter, but this is OK because it only means that the
// limit is enforced a bit less strictly -- it will still kick in eventually.
public:
inline explicit ReadLimiter(); // No limit.
......@@ -75,7 +80,10 @@ public:
// some data.
private:
WordCount64 limit;
volatile uint64_t limit;
// Current limit, decremented each time catRead() is called. Volatile because multiple threads
// could be trying to modify it at once. (This is not real thread-safety, but good enough for
// the purpose of this class. See class comment.)
KJ_DISALLOW_COPY(ReadLimiter);
};
......@@ -120,8 +128,6 @@ public:
inline BuilderArena* getArena();
inline WordCount available();
inline kj::ArrayPtr<const word> currentlyAllocated();
inline void reset();
......@@ -174,12 +180,21 @@ public:
~BuilderArena() noexcept(false);
KJ_DISALLOW_COPY(BuilderArena);
inline SegmentBuilder* getRootSegment() { return &segment0; }
SegmentBuilder* getSegment(SegmentId id);
// Get the segment with the given id. Crashes or throws an exception if no such segment exists.
SegmentBuilder* getSegmentWithAvailable(WordCount minimumAvailable);
// Get a segment which has at least the given amount of space available, allocating it if
// necessary. Crashes or throws an exception if there is not enough memory.
struct AllocateResult {
SegmentBuilder* segment;
word* words;
};
AllocateResult allocate(WordCount amount);
// Find a segment with at least the given amount of space available and allocate the space.
// Note that allocating directly from a particular segment is much faster, but allocating from
// the arena is guaranteed to succeed. Therefore callers should try to allocate from a specific
// segment first if there is one, then fall back to the arena.
kj::ArrayPtr<const kj::ArrayPtr<const word>> getSegmentsForOutput();
// Get an array of all the segments, suitable for writing out. This only returns the allocated
......@@ -203,25 +218,28 @@ private:
std::vector<kj::Own<SegmentBuilder>> builders;
std::vector<kj::ArrayPtr<const word>> forOutput;
};
kj::Maybe<kj::Own<MultiSegmentState>> moreSegments;
kj::MutexGuarded<kj::Maybe<kj::Own<MultiSegmentState>>> moreSegments;
};
// =======================================================================================
inline ReadLimiter::ReadLimiter()
// I didn't want to #include <limits> just for this one lousy constant.
: limit(uint64_t(0x7fffffffffffffffll) * WORDS) {}
: limit(0x7fffffffffffffffllu) {}
inline ReadLimiter::ReadLimiter(WordCount64 limit): limit(limit) {}
inline ReadLimiter::ReadLimiter(WordCount64 limit): limit(limit / WORDS) {}
inline void ReadLimiter::reset(WordCount64 limit) { this->limit = limit; }
inline void ReadLimiter::reset(WordCount64 limit) { this->limit = limit / WORDS; }
inline bool ReadLimiter::canRead(WordCount amount, Arena* arena) {
if (KJ_UNLIKELY(amount > limit)) {
// Be careful not to store an underflowed value into `limit`, even if multiple threads are
// decrementing it.
uint64_t current = limit;
if (KJ_UNLIKELY(amount / WORDS > current)) {
arena->reportReadLimitReached();
return false;
} else {
limit -= amount;
limit = current - amount / WORDS;
return true;
}
}
......@@ -258,13 +276,22 @@ inline SegmentBuilder::SegmentBuilder(
pos(ptr.begin()) {}
inline word* SegmentBuilder::allocate(WordCount amount) {
if (amount > intervalLength(pos, ptr.end())) {
word* result = __atomic_fetch_add(&pos, amount * BYTES_PER_WORD / BYTES, __ATOMIC_RELAXED);
// Careful about pointer arithmetic here. The segment might be at the end of the address space,
// or `amount` could be ridiculously huge.
if (ptr.end() - (result + amount) < 0) {
// Not enough space in the segment for this allocation.
if (ptr.end() - result >= 0) {
// It was our increment that pushed the pointer past the end of the segment. Therefore no
// other thread could have accidentally allocated space in this segment in the meantime.
// We need to back up the pointer so that it will be correct when the data is written out
// (and also so that another allocation can potentially use the remaining space).
__atomic_store_n(&pos, result, __ATOMIC_RELAXED);
}
return nullptr;
} else {
// TODO(someday): Atomic increment, backtracking if we go over, would make this thread-safe.
// How much would it cost in the single-threaded case? Is it free? Benchmark it.
word* result = pos;
pos += amount;
// Success.
return result;
}
}
......@@ -281,10 +308,6 @@ inline BuilderArena* SegmentBuilder::getArena() {
return static_cast<BuilderArena*>(arena);
}
inline WordCount SegmentBuilder::available() {
return intervalLength(pos, ptr.end());
}
inline kj::ArrayPtr<const word> SegmentBuilder::currentlyAllocated() {
return kj::arrayPtr(ptr.begin(), pos - ptr.begin());
}
......
......@@ -1559,12 +1559,12 @@ DynamicList::Builder PointerHelpers<DynamicList, Kind::UNKNOWN>::init(
// -------------------------------------------------------------------
Orphan<DynamicStruct> Orphanage::newOrphan(StructSchema schema) {
Orphan<DynamicStruct> Orphanage::newOrphan(StructSchema schema) const {
return Orphan<DynamicStruct>(
schema, _::OrphanBuilder::initStruct(arena, structSizeFromSchema(schema)));
}
Orphan<DynamicList> Orphanage::newOrphan(ListSchema schema, uint size) {
Orphan<DynamicList> Orphanage::newOrphan(ListSchema schema, uint size) const {
if (schema.whichElementType() == schema::Type::Body::STRUCT_TYPE) {
return Orphan<DynamicList>(schema, _::OrphanBuilder::initStructList(
arena, size * ELEMENTS, structSizeFromSchema(schema.getStructElementType())));
......
......@@ -748,14 +748,14 @@ struct Orphanage::GetInnerBuilder<DynamicList, Kind::UNKNOWN> {
template <>
inline Orphan<DynamicStruct> Orphanage::newOrphanCopy<DynamicStruct::Reader>(
const DynamicStruct::Reader& copyFrom) {
const DynamicStruct::Reader& copyFrom) const {
return Orphan<DynamicStruct>(
copyFrom.getSchema(), _::OrphanBuilder::copy(arena, copyFrom.reader));
}
template <>
inline Orphan<DynamicList> Orphanage::newOrphanCopy<DynamicList::Reader>(
const DynamicList::Reader& copyFrom) {
const DynamicList::Reader& copyFrom) const {
return Orphan<DynamicList>(copyFrom.getSchema(), _::OrphanBuilder::copy(arena, copyFrom.reader));
}
......
......@@ -23,6 +23,7 @@
#include <capnp/test-import.capnp.h>
#include "message.h"
#include <kj/thread.h>
#include <kj/debug.h>
#include <gtest/gtest.h>
#include "test-util.h"
......@@ -1350,6 +1351,54 @@ TEST(Encoding, Has) {
EXPECT_TRUE(root.asReader().hasInt32List());
}
TEST(Encoding, Threads) {
// Use fixed-size segments so that many segments are allocated during the test.
MallocMessageBuilder message(1024, AllocationStrategy::FIXED_SIZE);
kj::MutexGuarded<Orphanage> orphanage(message.getOrphanage());
auto outerLock = orphanage.lockExclusive();
auto threadFunc = [&]() {
int dummy;
uint64_t me = reinterpret_cast<uintptr_t>(&dummy);
{
// Make sure all threads start at the same time.
auto lock = orphanage.lockShared();
// Allocate space for a list. This will always end up allocating a new segment.
auto list = lock->newOrphan<List<List<uint64_t>>>(10000);
auto builder = list.get();
// Allocate a bunch of smaller lists and initialize them to values specific to this thread.
for (uint i = 0; i < builder.size(); i++) {
builder.set(i, {me, me + 1, me + 2, me + 3});
}
// Check that none of the values were corrupted.
for (auto item: list.getReader()) {
ASSERT_EQ(4, item.size());
EXPECT_EQ(me, item[0]);
EXPECT_EQ(me + 1, item[1]);
EXPECT_EQ(me + 2, item[2]);
EXPECT_EQ(me + 3, item[3]);
}
}
};
kj::Thread thread1(threadFunc);
kj::Thread thread2(threadFunc);
kj::Thread thread3(threadFunc);
kj::Thread thread4(threadFunc);
usleep(10000);
auto releaseLock = kj::mv(outerLock);
// On the way out, we'll release the lock, thus allowing the threads to start, then we'll join
// each thread, thus waiting for them all to complete.
}
} // namespace
} // namespace _ (private)
} // namespace capnp
......@@ -271,8 +271,9 @@ static void checkStruct(StructReader reader) {
TEST(WireFormat, StructRoundTrip_OneSegment) {
MallocMessageBuilder message;
BuilderArena arena(&message);
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
StructBuilder builder = StructBuilder::initRoot(
segment, rootLocation, StructSize(2 * WORDS, 4 * POINTERS, FieldSize::INLINE_COMPOSITE));
......@@ -307,8 +308,9 @@ TEST(WireFormat, StructRoundTrip_OneSegment) {
TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
MallocMessageBuilder message(0, AllocationStrategy::FIXED_SIZE);
BuilderArena arena(&message);
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
StructBuilder builder = StructBuilder::initRoot(
segment, rootLocation, StructSize(2 * WORDS, 4 * POINTERS, FieldSize::INLINE_COMPOSITE));
......@@ -344,8 +346,9 @@ TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
TEST(WireFormat, StructRoundTrip_MultipleSegmentsWithMultipleAllocations) {
MallocMessageBuilder message(8, AllocationStrategy::FIXED_SIZE);
BuilderArena arena(&message);
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS);
auto allocation = arena.allocate(1 * WORDS);
SegmentBuilder* segment = allocation.segment;
word* rootLocation = allocation.words;
StructBuilder builder = StructBuilder::initRoot(
segment, rootLocation, StructSize(2 * WORDS, 4 * POINTERS, FieldSize::INLINE_COMPOSITE));
......
......@@ -203,6 +203,16 @@ static const union {
// =======================================================================================
namespace {
template <typename T>
struct SegmentAnd {
SegmentBuilder* segment;
T value;
};
} // namespace
struct WireHelpers {
static KJ_ALWAYS_INLINE(WordCount roundBytesUpToWords(ByteCount bytes)) {
static_assert(sizeof(word) == 8, "This code assumes 64-bit words.");
......@@ -234,32 +244,61 @@ struct WireHelpers {
static KJ_ALWAYS_INLINE(word* allocate(
WirePointer*& ref, SegmentBuilder*& segment, WordCount amount,
WirePointer::Kind kind)) {
if (!ref->isNull()) zeroObject(segment, ref);
word* ptr = segment->allocate(amount);
if (ptr == nullptr) {
// Need to allocate in a new segment. We'll need to allocate an extra pointer worth of
// space to act as the landing pad for a far pointer.
WordCount amountPlusRef = amount + POINTER_SIZE_IN_WORDS;
segment = segment->getArena()->getSegmentWithAvailable(amountPlusRef);
ptr = segment->allocate(amountPlusRef);
// Set up the original pointer to be a far pointer to the new segment.
ref->setFar(false, segment->getOffsetTo(ptr));
ref->farRef.set(segment->getSegmentId());
// Initialize the landing pad to indicate that the data immediately follows the pad.
ref = reinterpret_cast<WirePointer*>(ptr);
ref->setKindAndTarget(kind, ptr + POINTER_SIZE_IN_WORDS);
// Allocated space follows new pointer.
return ptr + POINTER_SIZE_IN_WORDS;
WirePointer::Kind kind, BuilderArena* orphanArena)) {
// Allocate space in the mesasge for a new object, creating far pointers if necessary.
//
// * `ref` starts out being a reference to the pointer which shall be assigned to point at the
// new object. On return, `ref` points to a pointer which needs to be initialized with
// the object's type information. Normally this is the same pointer, but it can change if
// a far pointer was allocated -- in this case, `ref` will end up pointing to the far
// pointer's tag. Either way, `allocate()` takes care of making sure that the original
// pointer ends up leading to the new object. On return, only the upper 32 bit of `*ref`
// need to be filled in by the caller.
// * `segment` starts out pointing to the segment containing `ref`. On return, it points to
// the segment containing the allocated object, which is usually the same segment but could
// be a different one if the original segment was out of space.
// * `amount` is the number of words to allocate.
// * `kind` is the kind of object to allocate. It is used to initialize the pointer. It
// cannot be `FAR` -- far pointers are allocated automatically as needed.
// * `orphanArena` is usually null. If it is non-null, then we're allocating an orphan object.
// In this case, `segment` starts out null; the allocation takes place in an arbitrary
// segment belonging to the arena. `ref` will be initialized as a non-far pointer, but its
// target offset will be set to zero.
if (orphanArena == nullptr) {
if (!ref->isNull()) zeroObject(segment, ref);
word* ptr = segment->allocate(amount);
if (ptr == nullptr) {
// Need to allocate in a new segment. We'll need to allocate an extra pointer worth of
// space to act as the landing pad for a far pointer.
WordCount amountPlusRef = amount + POINTER_SIZE_IN_WORDS;
auto allocation = segment->getArena()->allocate(amountPlusRef);
segment = allocation.segment;
ptr = allocation.words;
// Set up the original pointer to be a far pointer to the new segment.
ref->setFar(false, segment->getOffsetTo(ptr));
ref->farRef.set(segment->getSegmentId());
// Initialize the landing pad to indicate that the data immediately follows the pad.
ref = reinterpret_cast<WirePointer*>(ptr);
ref->setKindAndTarget(kind, ptr + POINTER_SIZE_IN_WORDS);
// Allocated space follows new pointer.
return ptr + POINTER_SIZE_IN_WORDS;
} else {
ref->setKindAndTarget(kind, ptr);
return ptr;
}
} else {
ref->setKindAndTarget(kind, ptr);
return ptr;
// orphanArena is non-null. Allocate an orphan.
auto allocation = orphanArena->allocate(amount);
segment = allocation.segment;
ref->setKindWithZeroOffset(kind);
return allocation.words;
}
}
......@@ -614,7 +653,8 @@ struct WireHelpers {
return nullptr;
} else {
const word* srcPtr = src->target();
word* dstPtr = allocate(dst, segment, src->structRef.wordSize(), WirePointer::STRUCT);
word* dstPtr = allocate(
dst, segment, src->structRef.wordSize(), WirePointer::STRUCT, nullptr);
copyStruct(segment, dstPtr, srcPtr, src->structRef.dataSize.get(),
src->structRef.ptrCount.get());
......@@ -635,7 +675,7 @@ struct WireHelpers {
ElementCount64(src->listRef.elementCount()) *
dataBitsPerElement(src->listRef.elementSize()));
const word* srcPtr = src->target();
word* dstPtr = allocate(dst, segment, wordCount, WirePointer::LIST);
word* dstPtr = allocate(dst, segment, wordCount, WirePointer::LIST, nullptr);
memcpy(dstPtr, srcPtr, wordCount * BYTES_PER_WORD / BYTES);
dst->listRef.set(src->listRef.elementSize(), src->listRef.elementCount());
......@@ -647,7 +687,7 @@ struct WireHelpers {
WirePointer* dstRefs = reinterpret_cast<WirePointer*>(
allocate(dst, segment, src->listRef.elementCount() *
(1 * POINTERS / ELEMENTS) * WORDS_PER_POINTER,
WirePointer::LIST));
WirePointer::LIST, nullptr));
uint n = src->listRef.elementCount() / ELEMENTS;
for (uint i = 0; i < n; i++) {
......@@ -664,7 +704,7 @@ struct WireHelpers {
const word* srcPtr = src->target();
word* dstPtr = allocate(dst, segment,
src->listRef.inlineCompositeWordCount() + POINTER_SIZE_IN_WORDS,
WirePointer::LIST);
WirePointer::LIST, nullptr);
dst->listRef.setInlineComposite(src->listRef.inlineCompositeWordCount());
......@@ -740,10 +780,9 @@ struct WireHelpers {
reinterpret_cast<WirePointer*>(srcSegment->allocate(1 * WORDS));
if (landingPad == nullptr) {
// Darn, need a double-far.
SegmentBuilder* farSegment = srcSegment->getArena()->getSegmentWithAvailable(2 * WORDS);
landingPad = reinterpret_cast<WirePointer*>(farSegment->allocate(2 * WORDS));
KJ_DASSERT(landingPad != nullptr,
"getSegmentWithAvailable() returned segment without space available.");
auto allocation = srcSegment->getArena()->allocate(2 * WORDS);
SegmentBuilder* farSegment = allocation.segment;
landingPad = reinterpret_cast<WirePointer*>(allocation.words);
landingPad[0].setFar(false, srcSegment->getOffsetTo(srcPtr));
landingPad[0].farRef.segmentId.set(srcSegment->getSegmentId());
......@@ -767,9 +806,10 @@ struct WireHelpers {
// -----------------------------------------------------------------
static KJ_ALWAYS_INLINE(StructBuilder initStructPointer(
WirePointer* ref, SegmentBuilder* segment, StructSize size)) {
WirePointer* ref, SegmentBuilder* segment, StructSize size,
BuilderArena* orphanArena = nullptr)) {
// Allocate space for the new struct. Newly-allocated space is automatically zeroed.
word* ptr = allocate(ref, segment, size.total(), WirePointer::STRUCT);
word* ptr = allocate(ref, segment, size.total(), WirePointer::STRUCT, orphanArena);
// Initialize the pointer.
ref->structRef.set(size);
......@@ -786,7 +826,7 @@ struct WireHelpers {
static KJ_ALWAYS_INLINE(StructBuilder getWritableStructPointer(
WirePointer* ref, word* refTarget, SegmentBuilder* segment, StructSize size,
const word* defaultValue)) {
const word* defaultValue, BuilderArena* orphanArena = nullptr)) {
if (ref->isNull()) {
useDefault:
if (defaultValue == nullptr ||
......@@ -824,7 +864,7 @@ struct WireHelpers {
// Don't let allocate() zero out the object just yet.
zeroPointerAndFars(segment, ref);
word* ptr = allocate(ref, segment, totalSize, WirePointer::STRUCT);
word* ptr = allocate(ref, segment, totalSize, WirePointer::STRUCT, orphanArena);
ref->structRef.set(newDataSize, newPointerCount);
// Copy data section.
......@@ -854,7 +894,7 @@ struct WireHelpers {
static KJ_ALWAYS_INLINE(ListBuilder initListPointer(
WirePointer* ref, SegmentBuilder* segment, ElementCount elementCount,
FieldSize elementSize)) {
FieldSize elementSize, BuilderArena* orphanArena = nullptr)) {
KJ_DREQUIRE(elementSize != FieldSize::INLINE_COMPOSITE,
"Should have called initStructListPointer() instead.");
......@@ -866,7 +906,7 @@ struct WireHelpers {
WordCount wordCount = roundBitsUpToWords(ElementCount64(elementCount) * step);
// Allocate the list.
word* ptr = allocate(ref, segment, wordCount, WirePointer::LIST);
word* ptr = allocate(ref, segment, wordCount, WirePointer::LIST, orphanArena);
// Initialize the pointer.
ref->listRef.set(elementSize, elementCount);
......@@ -877,7 +917,7 @@ struct WireHelpers {
static KJ_ALWAYS_INLINE(ListBuilder initStructListPointer(
WirePointer* ref, SegmentBuilder* segment, ElementCount elementCount,
StructSize elementSize)) {
StructSize elementSize, BuilderArena* orphanArena = nullptr)) {
if (elementSize.preferredListEncoding != FieldSize::INLINE_COMPOSITE) {
// Small data-only struct. Allocate a list of primitives instead.
return initListPointer(ref, segment, elementCount, elementSize.preferredListEncoding);
......@@ -887,7 +927,8 @@ struct WireHelpers {
// Allocate the list, prefixed by a single WirePointer.
WordCount wordCount = elementCount * wordsPerElement;
word* ptr = allocate(ref, segment, POINTER_SIZE_IN_WORDS + wordCount, WirePointer::LIST);
word* ptr = allocate(ref, segment, POINTER_SIZE_IN_WORDS + wordCount, WirePointer::LIST,
orphanArena);
// Initialize the pointer.
// INLINE_COMPOSITE lists replace the element count with the word count.
......@@ -1023,7 +1064,7 @@ struct WireHelpers {
}
static KJ_ALWAYS_INLINE(ListBuilder getWritableStructListPointer(
WirePointer* origRef, word* origRefTarget, SegmentBuilder* origSegment,
StructSize elementSize, const word* defaultValue)) {
StructSize elementSize, const word* defaultValue, BuilderArena* orphanArena = nullptr)) {
if (origRef->isNull()) {
useDefault:
if (defaultValue == nullptr ||
......@@ -1082,7 +1123,7 @@ struct WireHelpers {
zeroPointerAndFars(origSegment, origRef);
word* newPtr = allocate(origRef, origSegment, totalSize + POINTER_SIZE_IN_WORDS,
WirePointer::LIST);
WirePointer::LIST, orphanArena);
origRef->listRef.setInlineComposite(totalSize);
WirePointer* newTag = reinterpret_cast<WirePointer*>(newPtr);
......@@ -1187,7 +1228,7 @@ struct WireHelpers {
zeroPointerAndFars(origSegment, origRef);
word* newPtr = allocate(origRef, origSegment, totalWords + POINTER_SIZE_IN_WORDS,
WirePointer::LIST);
WirePointer::LIST, orphanArena);
origRef->listRef.setInlineComposite(totalWords);
WirePointer* tag = reinterpret_cast<WirePointer*>(newPtr);
......@@ -1249,7 +1290,7 @@ struct WireHelpers {
// Don't let allocate() zero out the object just yet.
zeroPointerAndFars(origSegment, origRef);
word* newPtr = allocate(origRef, origSegment, totalWords, WirePointer::LIST);
word* newPtr = allocate(origRef, origSegment, totalWords, WirePointer::LIST, orphanArena);
origRef->listRef.set(elementSize.preferredListEncoding, elementCount);
char* newBytePtr = reinterpret_cast<char*>(newPtr);
......@@ -1278,25 +1319,29 @@ struct WireHelpers {
}
}
static KJ_ALWAYS_INLINE(Text::Builder initTextPointer(
WirePointer* ref, SegmentBuilder* segment, ByteCount size)) {
static KJ_ALWAYS_INLINE(SegmentAnd<Text::Builder> initTextPointer(
WirePointer* ref, SegmentBuilder* segment, ByteCount size,
BuilderArena* orphanArena = nullptr)) {
// The byte list must include a NUL terminator.
ByteCount byteSize = size + 1 * BYTES;
// Allocate the space.
word* ptr = allocate(ref, segment, roundBytesUpToWords(byteSize), WirePointer::LIST);
word* ptr = allocate(
ref, segment, roundBytesUpToWords(byteSize), WirePointer::LIST, orphanArena);
// Initialize the pointer.
ref->listRef.set(FieldSize::BYTE, byteSize * (1 * ELEMENTS / BYTES));
// Build the Text::Builder. This will initialize the NUL terminator.
return Text::Builder(reinterpret_cast<char*>(ptr), size / BYTES);
return { segment, Text::Builder(reinterpret_cast<char*>(ptr), size / BYTES) };
}
static KJ_ALWAYS_INLINE(void setTextPointer(
WirePointer* ref, SegmentBuilder* segment, Text::Reader value)) {
memcpy(initTextPointer(ref, segment, value.size() * BYTES).begin(),
value.begin(), value.size());
static KJ_ALWAYS_INLINE(SegmentAnd<Text::Builder> setTextPointer(
WirePointer* ref, SegmentBuilder* segment, Text::Reader value,
BuilderArena* orphanArena = nullptr)) {
auto allocation = initTextPointer(ref, segment, value.size() * BYTES, orphanArena);
memcpy(allocation.value.begin(), value.begin(), value.size());
return allocation;
}
static KJ_ALWAYS_INLINE(Text::Builder getWritableTextPointer(
......@@ -1312,7 +1357,7 @@ struct WireHelpers {
if (defaultSize == 0 * BYTES) {
return nullptr;
} else {
Text::Builder builder = initTextPointer(ref, segment, defaultSize);
Text::Builder builder = initTextPointer(ref, segment, defaultSize).value;
memcpy(builder.begin(), defaultValue, defaultSize / BYTES);
return builder;
}
......@@ -1329,22 +1374,25 @@ struct WireHelpers {
}
}
static KJ_ALWAYS_INLINE(Data::Builder initDataPointer(
WirePointer* ref, SegmentBuilder* segment, ByteCount size)) {
static KJ_ALWAYS_INLINE(SegmentAnd<Data::Builder> initDataPointer(
WirePointer* ref, SegmentBuilder* segment, ByteCount size,
BuilderArena* orphanArena = nullptr)) {
// Allocate the space.
word* ptr = allocate(ref, segment, roundBytesUpToWords(size), WirePointer::LIST);
word* ptr = allocate(ref, segment, roundBytesUpToWords(size), WirePointer::LIST, orphanArena);
// Initialize the pointer.
ref->listRef.set(FieldSize::BYTE, size * (1 * ELEMENTS / BYTES));
// Build the Data::Builder.
return Data::Builder(reinterpret_cast<byte*>(ptr), size / BYTES);
return { segment, Data::Builder(reinterpret_cast<byte*>(ptr), size / BYTES) };
}
static KJ_ALWAYS_INLINE(void setDataPointer(
WirePointer* ref, SegmentBuilder* segment, Data::Reader value)) {
memcpy(initDataPointer(ref, segment, value.size() * BYTES).begin(),
value.begin(), value.size());
static KJ_ALWAYS_INLINE(SegmentAnd<Data::Builder> setDataPointer(
WirePointer* ref, SegmentBuilder* segment, Data::Reader value,
BuilderArena* orphanArena = nullptr)) {
auto allocation = initDataPointer(ref, segment, value.size() * BYTES, orphanArena);
memcpy(allocation.value.begin(), value.begin(), value.size());
return allocation;
}
static KJ_ALWAYS_INLINE(Data::Builder getWritableDataPointer(
......@@ -1360,7 +1408,7 @@ struct WireHelpers {
if (defaultSize == 0 * BYTES) {
return nullptr;
} else {
Data::Builder builder = initDataPointer(ref, segment, defaultSize);
Data::Builder builder = initDataPointer(ref, segment, defaultSize).value;
memcpy(builder.begin(), defaultValue, defaultSize / BYTES);
return builder;
}
......@@ -1427,11 +1475,13 @@ struct WireHelpers {
}
}
static word* setStructPointer(SegmentBuilder* segment, WirePointer* ref, StructReader value) {
static SegmentAnd<word*> setStructPointer(
SegmentBuilder* segment, WirePointer* ref, StructReader value,
BuilderArena* orphanArena = nullptr) {
WordCount dataSize = roundBitsUpToWords(value.dataSize);
WordCount totalSize = dataSize + value.pointerCount * WORDS_PER_POINTER;
word* ptr = allocate(ref, segment, totalSize, WirePointer::STRUCT);
word* ptr = allocate(ref, segment, totalSize, WirePointer::STRUCT, orphanArena);
ref->structRef.set(dataSize, value.pointerCount);
if (value.dataSize == 1 * BITS) {
......@@ -1446,15 +1496,17 @@ struct WireHelpers {
value.segment, value.pointers + i, nullptr, value.nestingLimit));
}
return ptr;
return { segment, ptr };
}
static word* setListPointer(SegmentBuilder* segment, WirePointer* ref, ListReader value) {
static SegmentAnd<word*> setListPointer(
SegmentBuilder* segment, WirePointer* ref, ListReader value,
BuilderArena* orphanArena = nullptr) {
WordCount totalSize = roundBitsUpToWords(value.elementCount * value.step);
if (value.step * ELEMENTS <= BITS_PER_WORD * WORDS) {
// List of non-structs.
word* ptr = allocate(ref, segment, totalSize, WirePointer::LIST);
word* ptr = allocate(ref, segment, totalSize, WirePointer::LIST, orphanArena);
if (value.structPointerCount == 1 * POINTERS) {
// List of pointers.
......@@ -1483,10 +1535,11 @@ struct WireHelpers {
memcpy(ptr, value.ptr, totalSize * BYTES_PER_WORD / BYTES);
}
return ptr;
return { segment, ptr };
} else {
// List of structs.
word* ptr = allocate(ref, segment, totalSize + POINTER_SIZE_IN_WORDS, WirePointer::LIST);
word* ptr = allocate(ref, segment, totalSize + POINTER_SIZE_IN_WORDS, WirePointer::LIST,
orphanArena);
ref->listRef.setInlineComposite(totalSize);
WordCount dataSize = roundBitsUpToWords(value.structDataSize);
......@@ -1512,7 +1565,7 @@ struct WireHelpers {
}
}
return ptr;
return { segment, ptr };
}
}
......@@ -2027,7 +2080,7 @@ ListBuilder StructBuilder::getStructListField(
template <>
Text::Builder StructBuilder::initBlobField<Text>(WirePointerCount ptrIndex, ByteCount size) {
return WireHelpers::initTextPointer(pointers + ptrIndex, segment, size);
return WireHelpers::initTextPointer(pointers + ptrIndex, segment, size).value;
}
template <>
void StructBuilder::setBlobField<Text>(WirePointerCount ptrIndex, Text::Reader value) {
......@@ -2042,7 +2095,7 @@ Text::Builder StructBuilder::getBlobField<Text>(
template <>
Data::Builder StructBuilder::initBlobField<Data>(WirePointerCount ptrIndex, ByteCount size) {
return WireHelpers::initDataPointer(pointers + ptrIndex, segment, size);
return WireHelpers::initDataPointer(pointers + ptrIndex, segment, size).value;
}
template <>
void StructBuilder::setBlobField<Data>(WirePointerCount ptrIndex, Data::Reader value) {
......@@ -2280,7 +2333,7 @@ ListBuilder ListBuilder::getStructListElement(ElementCount index, StructSize ele
template <>
Text::Builder ListBuilder::initBlobElement<Text>(ElementCount index, ByteCount size) {
return WireHelpers::initTextPointer(
reinterpret_cast<WirePointer*>(ptr + index * step / BITS_PER_BYTE), segment, size);
reinterpret_cast<WirePointer*>(ptr + index * step / BITS_PER_BYTE), segment, size).value;
}
template <>
void ListBuilder::setBlobElement<Text>(ElementCount index, Text::Reader value) {
......@@ -2296,7 +2349,7 @@ Text::Builder ListBuilder::getBlobElement<Text>(ElementCount index) {
template <>
Data::Builder ListBuilder::initBlobElement<Data>(ElementCount index, ByteCount size) {
return WireHelpers::initDataPointer(
reinterpret_cast<WirePointer*>(ptr + index * step / BITS_PER_BYTE), segment, size);
reinterpret_cast<WirePointer*>(ptr + index * step / BITS_PER_BYTE), segment, size).value;
}
template <>
void ListBuilder::setBlobElement<Data>(ElementCount index, Data::Reader value) {
......@@ -2435,41 +2488,20 @@ ObjectReader ListReader::getObjectElement(ElementCount index) const {
// =======================================================================================
// OrphanBuilder
// TODO(cleanup): This is hacky. In order to reuse WireHelpers in OrphanBuilder::init*() and
// OrphanBuilder::copy*(), we actually pass them pointers to WirePointers allocated on the stack.
// When this pointer is initialized, the offset may be truncated and thus end up being garbage.
// This is OK because we define the offset to be ignored in this case, but there is a fair amount
// of non-local reasoning going on. Additionally, in order to select a segment, these methods
// manually compute the size that they expect WireHelpers to allocate. This is redundant and
// could theoretically get out-of-sync with the way WireHelpers computes them, leading to subtle
// bugs. Some refactoring could make this cleaner, perhaps, but I couldn't think of a reasonably
// non-invasive approach.
OrphanBuilder OrphanBuilder::initStruct(BuilderArena* arena, StructSize size) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(size.total());
StructBuilder builder = WireHelpers::initStructPointer(result.tagAsPtr(), result.segment, size);
KJ_ASSERT(builder.segment == result.segment,
"Orphan was unexpectedly allocated in a different segment.");
StructBuilder builder = WireHelpers::initStructPointer(result.tagAsPtr(), nullptr, size, arena);
result.segment = builder.segment;
result.location = reinterpret_cast<word*>(builder.data);
return result;
}
OrphanBuilder OrphanBuilder::initList(
BuilderArena* arena, ElementCount elementCount, FieldSize elementSize) {
KJ_DREQUIRE(elementSize != FieldSize::INLINE_COMPOSITE,
"Use OrphanBuilder::initStructList() instead.");
decltype(BITS / ELEMENTS) bitsPerElement = dataBitsPerElement(elementSize) +
pointersPerElement(elementSize) * BITS_PER_POINTER;
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
WireHelpers::roundBitsUpToWords(bitsPerElement * ElementCount64(elementCount)));
ListBuilder builder =
WireHelpers::initListPointer(result.tagAsPtr(), result.segment, elementCount, elementSize);
KJ_ASSERT(builder.segment == result.segment,
"Orphan was unexpectedly allocated in a different segment.");
ListBuilder builder = WireHelpers::initListPointer(
result.tagAsPtr(), nullptr, elementCount, elementSize, arena);
result.segment = builder.segment;
result.location = reinterpret_cast<word*>(builder.ptr);
return result;
}
......@@ -2481,12 +2513,9 @@ OrphanBuilder OrphanBuilder::initStructList(
return initList(arena, elementCount, elementSize.preferredListEncoding);
} else {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
elementCount * (elementSize.total() / ELEMENTS) + POINTER_SIZE_IN_WORDS);
ListBuilder builder = WireHelpers::initStructListPointer(
result.tagAsPtr(), result.segment, elementCount, elementSize);
KJ_ASSERT(builder.segment == result.segment,
"Orphan was unexpectedly allocated in a different segment.");
result.tagAsPtr(), nullptr, elementCount, elementSize, arena);
result.segment = builder.segment;
result.location = reinterpret_cast<word*>(builder.ptr) - POINTER_SIZE_IN_WORDS;
return result;
}
......@@ -2494,78 +2523,51 @@ OrphanBuilder OrphanBuilder::initStructList(
OrphanBuilder OrphanBuilder::initText(BuilderArena* arena, ByteCount size) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
WireHelpers::roundBytesUpToWords(size + 1 * BYTES));
Text::Builder builder = WireHelpers::initTextPointer(result.tagAsPtr(), result.segment, size);
result.location = reinterpret_cast<word*>(builder.begin());
KJ_ASSERT(result.segment->getOffsetTo(result.location) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
auto allocation = WireHelpers::initTextPointer(result.tagAsPtr(), nullptr, size, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value.begin());
return result;
}
OrphanBuilder OrphanBuilder::initData(BuilderArena* arena, ByteCount size) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(WireHelpers::roundBytesUpToWords(size));
Data::Builder builder = WireHelpers::initDataPointer(result.tagAsPtr(), result.segment, size);
result.location = reinterpret_cast<word*>(builder.begin());
KJ_ASSERT(result.segment->getOffsetTo(result.location) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
auto allocation = WireHelpers::initDataPointer(result.tagAsPtr(), nullptr, size, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value.begin());
return result;
}
OrphanBuilder OrphanBuilder::copy(BuilderArena* arena, StructReader copyFrom) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
WireHelpers::roundBitsUpToWords(copyFrom.getDataSectionSize()) +
copyFrom.getPointerSectionSize() * WORDS_PER_POINTER);
word* ptr = WireHelpers::setStructPointer(result.segment, result.tagAsPtr(), copyFrom);
KJ_ASSERT(result.segment->getOffsetTo(ptr) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
result.location = reinterpret_cast<word*>(ptr);
auto allocation = WireHelpers::setStructPointer(nullptr, result.tagAsPtr(), copyFrom, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value);
return result;
}
OrphanBuilder OrphanBuilder::copy(BuilderArena* arena, ListReader copyFrom) {
OrphanBuilder result;
WordCount wordCount = WireHelpers::roundBitsUpToWords(
copyFrom.step * ElementCount64(copyFrom.elementCount));
if (copyFrom.step * ELEMENTS > BITS_PER_WORD * WORDS) {
// This is a struct list.
wordCount += 1 * WORDS;
}
result.segment = arena->getSegmentWithAvailable(wordCount);
word* ptr = WireHelpers::setListPointer(result.segment, result.tagAsPtr(), copyFrom);
KJ_ASSERT(result.segment->getOffsetTo(ptr) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
result.location = reinterpret_cast<word*>(ptr);
auto allocation = WireHelpers::setListPointer(nullptr, result.tagAsPtr(), copyFrom, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value);
return result;
}
OrphanBuilder OrphanBuilder::copy(BuilderArena* arena, Text::Reader copyFrom) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
WireHelpers::roundBytesUpToWords((copyFrom.size() + 1) * BYTES));
Text::Builder text = WireHelpers::initTextPointer(
result.tagAsPtr(), result.segment, copyFrom.size() * BYTES);
result.location = reinterpret_cast<word*>(text.begin());
KJ_ASSERT(result.segment->getOffsetTo(result.location) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
memcpy(text.begin(), copyFrom.begin(), copyFrom.size());
auto allocation = WireHelpers::setTextPointer(
result.tagAsPtr(), nullptr, copyFrom, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value.begin());
return result;
}
OrphanBuilder OrphanBuilder::copy(BuilderArena* arena, Data::Reader copyFrom) {
OrphanBuilder result;
result.segment = arena->getSegmentWithAvailable(
WireHelpers::roundBytesUpToWords(copyFrom.size() * BYTES));
Data::Builder data = WireHelpers::initDataPointer(
result.tagAsPtr(), result.segment, copyFrom.size() * BYTES);
result.location = reinterpret_cast<word*>(data.begin());
KJ_ASSERT(result.segment->getOffsetTo(result.location) <= result.segment->getSize(),
"Orphan was unexpectedly allocated in a different segment.");
memcpy(data.begin(), copyFrom.begin(), copyFrom.size());
auto allocation = WireHelpers::setDataPointer(
result.tagAsPtr(), nullptr, copyFrom, arena);
result.segment = allocation.segment;
result.location = reinterpret_cast<word*>(allocation.value.begin());
return result;
}
......
......@@ -65,7 +65,7 @@ _::StructReader MessageReader::getRootInternal() {
MessageBuilder::MessageBuilder(): allocatedArena(false) {}
MessageBuilder::~MessageBuilder() noexcept(false) {
if (allocatedArena) {
arena()->~BuilderArena();
kj::dtor(*arena());
}
}
......@@ -74,19 +74,17 @@ _::SegmentBuilder* MessageBuilder::getRootSegment() {
return arena()->getSegment(_::SegmentId(0));
} else {
static_assert(sizeof(_::BuilderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BuilderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) _::BuilderArena(this);
"arenaSpace is too small to hold a BuilderArena. Please increase it.");
kj::ctor(*arena(), this);
allocatedArena = true;
WordCount ptrSize = 1 * POINTERS * WORDS_PER_POINTER;
_::SegmentBuilder* segment = arena()->getSegmentWithAvailable(ptrSize);
KJ_ASSERT(segment->getSegmentId() == _::SegmentId(0),
auto allocation = arena()->allocate(POINTER_SIZE_IN_WORDS);
KJ_ASSERT(allocation.segment->getSegmentId() == _::SegmentId(0),
"First allocated word of new arena was not in segment ID 0.");
word* location = segment->allocate(ptrSize);
KJ_ASSERT(location == segment->getPtrUnchecked(0 * WORDS),
KJ_ASSERT(allocation.words == allocation.segment->getPtrUnchecked(0 * WORDS),
"First allocated word of new arena was not the first word in its segment.");
return segment;
return allocation.segment;
}
}
......
......@@ -23,6 +23,7 @@
#include <kj/common.h>
#include <kj/memory.h>
#include <kj/mutex.h>
#include "common.h"
#include "layout.h"
......@@ -78,6 +79,16 @@ struct ReaderOptions {
};
class MessageReader {
// Abstract interface for an object used to read a Cap'n Proto message. Subclasses of
// MessageReader are responsible for reading the raw, flat message content. Callers should
// usually call `messageReader.getRoot<MyStructType>()` to get a `MyStructType::Reader`
// representing the root of the message, then use that to traverse the message content.
//
// Some common subclasses of `MessageReader` include `SegmentArrayMessageReader`, whose
// constructor accepts pointers to the raw data, and `StreamFdMessageReader` (from
// `serialize.h`), which reads the message from a file descriptor. One might implement other
// subclasses to handle things like reading from shared memory segments, mmap()ed files, etc.
public:
MessageReader(ReaderOptions options);
// It is suggested that subclasses take ReaderOptions as a constructor parameter, but give it a
......@@ -120,6 +131,17 @@ private:
};
class MessageBuilder {
// Abstract interface for an object used to allocate and build a message. Subclasses of
// MessageBuilder are responsible for allocating the space in which the message will be written.
// The most common subclass is `MallocMessageBuilder`, but other subclasses may be used to do
// tricky things like allocate messages in shared memory or mmap()ed files.
//
// Creating a new message ususually means allocating a new MessageBuilder (ideally on the stack)
// and then calling `messageBuilder.initRoot<MyStructType>()` to get a `MyStructType::Builder`.
// That, in turn, can be used to fill in the message content. When done, you can call
// `messageBuilder.getSegmentsForOutput()` to get a list of flat data arrays containing the
// message.
public:
MessageBuilder();
virtual ~MessageBuilder() noexcept(false);
......@@ -129,6 +151,9 @@ public:
// this is not possible. It is expected that this method will usually return more space than
// requested, and the caller should use that extra space as much as possible before allocating
// more. The returned space remains valid at least until the MessageBuilder is destroyed.
//
// Cap'n Proto will only call this once at a time, so the subclass need not worry about
// thread-safety.
template <typename RootType>
typename RootType::Builder initRoot();
......@@ -159,12 +184,18 @@ public:
Orphanage getOrphanage();
private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
// Space in which we can construct a BuilderArena. We don't use BuilderArena directly here
// because we don't want clients to have to #include arena.h, which itself includes a bunch of
// big STL headers. We don't use a pointer to a BuilderArena because that would require an
// extra malloc on every message which could be expensive when processing small messages.
void* arenaSpace[15];
bool allocatedArena = false;
// We have to initialize the arena lazily because when we do so we want to allocate the root
// pointer immediately, and this will allocate a segment, which requires a virtual function
// call on the MessageBuilder. We can't do such a call in the constructor since the subclass
// isn't constructed yet. This is kind of annoying because it means that getOrphanage() is
// not thread-safe, but that shouldn't be a huge deal...
_::BuilderArena* arena() { return reinterpret_cast<_::BuilderArena*>(arenaSpace); }
_::SegmentBuilder* getRootSegment();
......
......@@ -89,23 +89,23 @@ public:
// `getOrphanage()` method.
template <typename RootType>
Orphan<RootType> newOrphan();
Orphan<RootType> newOrphan() const;
// Allocate a new orphaned struct.
template <typename RootType>
Orphan<RootType> newOrphan(uint size);
Orphan<RootType> newOrphan(uint size) const;
// Allocate a new orphaned list or blob.
Orphan<DynamicStruct> newOrphan(StructSchema schema);
Orphan<DynamicStruct> newOrphan(StructSchema schema) const;
// Dynamically create an orphan struct with the given schema. You must
// #include <capnp/dynamic.h> to use this.
Orphan<DynamicList> newOrphan(ListSchema schema, uint size);
Orphan<DynamicList> newOrphan(ListSchema schema, uint size) const;
// Dynamically create an orphan list with the given schema. You must #include <capnp/dynamic.h>
// to use this.
template <typename Reader>
Orphan<FromReader<Reader>> newOrphanCopy(const Reader& copyFrom);
Orphan<FromReader<Reader>> newOrphanCopy(const Reader& copyFrom) const;
// Allocate a new orphaned object (struct, list, or blob) and initialize it as a copy of the
// given object.
......@@ -214,7 +214,7 @@ Orphanage Orphanage::getForMessageContaining(BuilderType builder) {
}
template <typename RootType>
Orphan<RootType> Orphanage::newOrphan() {
Orphan<RootType> Orphanage::newOrphan() const {
return Orphan<RootType>(_::OrphanBuilder::initStruct(arena, _::structSize<RootType>()));
}
......@@ -247,7 +247,7 @@ struct Orphanage::NewOrphanListImpl<Data> {
};
template <typename RootType>
Orphan<RootType> Orphanage::newOrphan(uint size) {
Orphan<RootType> Orphanage::newOrphan(uint size) const {
return Orphan<RootType>(NewOrphanListImpl<RootType>::apply(arena, size));
}
......@@ -273,7 +273,7 @@ struct Orphanage::GetInnerReader<T, Kind::BLOB> {
};
template <typename Reader>
Orphan<FromReader<Reader>> Orphanage::newOrphanCopy(const Reader& copyFrom) {
Orphan<FromReader<Reader>> Orphanage::newOrphanCopy(const Reader& copyFrom) const {
return Orphan<FromReader<Reader>>(_::OrphanBuilder::copy(
arena, GetInnerReader<FromReader<Reader>>::apply(copyFrom)));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment