Commit fe29fa08 authored by Kenton Varda's avatar Kenton Varda

Better Arena design. Allows incremental reads.

parent 6a72d324
...@@ -34,85 +34,161 @@ Arena::~Arena() {} ...@@ -34,85 +34,161 @@ Arena::~Arena() {}
// ======================================================================================= // =======================================================================================
ReaderArena::ReaderArena( ReaderArena::ReaderArena(std::unique_ptr<ReaderContext> context)
ArrayPtr<const ArrayPtr<const word>> segments, : context(std::move(context)),
ErrorReporter* errorReporter, readLimiter(this->context->getReadLimit() * WORDS),
WordCount64 readLimit) segment0(this, SegmentId(0), this->context->getSegment(0), &readLimiter) {}
: segments(segments),
errorReporter(errorReporter),
readLimiter(readLimit) {
segmentReaders.reserve(segments.size());
uint i = 0;
for (auto segment: segments) {
segmentReaders.emplace_back(new SegmentReader(this, SegmentId(i++), segment, &readLimiter));
}
}
ReaderArena::~ReaderArena() {} ReaderArena::~ReaderArena() {}
ArrayPtr<const ArrayPtr<const word>> ReaderArena::getSegmentsForOutput() {
return segments;
}
SegmentReader* ReaderArena::tryGetSegment(SegmentId id) { SegmentReader* ReaderArena::tryGetSegment(SegmentId id) {
if (id.value >= segments.size()) { if (id == SegmentId(0)) {
if (segment0.getArray() == nullptr) {
return nullptr;
} else {
return &segment0;
}
}
// TODO: Lock a mutex so that reading is thread-safe. Take a reader lock during the first
// lookup, unlock it before calling getSegment(), then take a writer lock to update the map.
// Bleh, lazy initialization is sad.
if (moreSegments != nullptr) {
auto iter = moreSegments->find(id.value);
if (iter != moreSegments->end()) {
return iter->second.get();
}
}
ArrayPtr<const word> newSegment = context->getSegment(id.value);
if (newSegment == nullptr) {
return nullptr; return nullptr;
} else {
return segmentReaders[id.value].get();
} }
if (moreSegments == nullptr) {
// OK, the segment exists, so allocate the map.
moreSegments = std::unique_ptr<SegmentMap>(new SegmentMap);
}
std::unique_ptr<SegmentReader>* slot = &(*moreSegments)[id.value];
*slot = std::unique_ptr<SegmentReader>(new SegmentReader(this, id, newSegment, &readLimiter));
return slot->get();
} }
void ReaderArena::reportInvalidData(const char* description) { void ReaderArena::reportInvalidData(const char* description) {
errorReporter->reportError(description); context->reportError(description);
} }
void ReaderArena::reportReadLimitReached() { void ReaderArena::reportReadLimitReached() {
errorReporter->reportError("Exceeded read limit."); context->reportError("Exceeded read limit.");
} }
// ======================================================================================= // =======================================================================================
BuilderArena::BuilderArena(Allocator* allocator): allocator(allocator) {} BuilderArena::BuilderArena(std::unique_ptr<BuilderContext> context)
BuilderArena::~BuilderArena() { : context(std::move(context)), segment0(nullptr, SegmentId(0), nullptr, nullptr) {}
// TODO: This is wrong because we aren't taking into account how much of each segment is actually BuilderArena::~BuilderArena() {}
// allocated.
uint i = 0;
for (ArrayPtr<word> ptr: memory) {
// The memory array contains Array<const word> only to ease implementation of getSegmentsForOutput().
// We actually own this space and can de-constify it.
allocator->free(SegmentId(i++), ptr);
}
}
SegmentBuilder* BuilderArena::getSegment(SegmentId id) { SegmentBuilder* BuilderArena::getSegment(SegmentId id) {
return segments[id.value].get(); // This method is allowed to crash if the segment ID is not valid.
if (id == SegmentId(0)) {
return &segment0;
} else {
return moreSegments->builders[id.value - 1].get();
}
} }
SegmentBuilder* BuilderArena::getSegmentWithAvailable(WordCount minimumAvailable) { SegmentBuilder* BuilderArena::getSegmentWithAvailable(WordCount minimumAvailable) {
if (segments.empty() || segments.back()->available() < minimumAvailable) { // TODO: Mutex-locking? Do we want to allow people to build different parts of the same message
ArrayPtr<word> array = allocator->allocate( // in different threads?
SegmentId(segments.size()), minimumAvailable / WORDS);
memory.push_back(array); if (segment0.getArena() == nullptr) {
segments.push_back(std::unique_ptr<SegmentBuilder>(new SegmentBuilder( // We're allocating the first segment.
this, SegmentId(segments.size()), array, &dummyLimiter))); ArrayPtr<word> ptr = context->allocateSegment(minimumAvailable / WORDS);
// Re-allocate segment0 in-place. This is a bit of a hack, but we have not returned any
// pointers to this segment yet, so it should be fine.
segment0.~SegmentBuilder();
return new (&segment0) SegmentBuilder(this, SegmentId(0), ptr, &this->dummyLimiter);
} else {
if (segment0.available() >= minimumAvailable) {
return &segment0;
}
if (moreSegments == nullptr) {
moreSegments = std::unique_ptr<MultiSegmentState>(new MultiSegmentState());
} else {
// TODO: Check for available space in more than just the last segment. We don't want this
// to be O(n), though, so we'll need to maintain some sort of table. Complicating matters,
// we want SegmentBuilders::allocate() to be fast, so we can't update any such table when
// allocation actually happens. Instead, we could have a priority queue based on the
// last-known available size, and then re-check the size when we pop segments off it and
// shove them to the back of the queue if they have become too small.
if (moreSegments->builders.back()->available() >= minimumAvailable) {
return moreSegments->builders.back().get();
}
}
std::unique_ptr<SegmentBuilder> newBuilder = std::unique_ptr<SegmentBuilder>(
new SegmentBuilder(this, SegmentId(moreSegments->builders.size() + 1),
context->allocateSegment(minimumAvailable / WORDS), &this->dummyLimiter));
SegmentBuilder* result = newBuilder.get();
moreSegments->builders.push_back(std::move(newBuilder));
// Keep forOutput the right size so that we don't have to re-allocate during
// getSegmentsForOutput(), which callers might reasonably expect is a thread-safe method.
moreSegments->forOutput.resize(moreSegments->builders.size() + 1);
return result;
} }
return segments.back().get();
} }
ArrayPtr<const ArrayPtr<const word>> BuilderArena::getSegmentsForOutput() { ArrayPtr<const ArrayPtr<const word>> BuilderArena::getSegmentsForOutput() {
segmentsForOutput.resize(segments.size()); // We shouldn't need to lock a mutex here because if this is called multiple times simultaneously,
for (uint i = 0; i < segments.size(); i++) { // we should only be overwriting the array with the exact same data. If the number or size of
segmentsForOutput[i] = segments[i]->currentlyAllocated(); // segments is actually changing due to an activity in another thread, then the caller has a
// problem regardless of locking here.
if (moreSegments == nullptr) {
if (segment0.getArena() == nullptr) {
// We haven't actually allocated any segments yet.
return nullptr;
} else {
// We have only one segment so far.
segment0ForOutput = segment0.currentlyAllocated();
return arrayPtr(&segment0ForOutput, 1);
}
} else {
CAPNPROTO_DEBUG_ASSERT(moreSegments->forOutput.size() == moreSegments->builders.size() + 1,
"Bug in capnproto::internal::BuilderArena: moreSegments->forOutput wasn't resized "
"correctly when the last builder was added.");
ArrayPtr<ArrayPtr<const word>> result(
&moreSegments->forOutput[0], moreSegments->forOutput.size());
uint i = 0;
result[i++] = segment0.currentlyAllocated();
for (auto& builder: moreSegments->builders) {
result[i++] = builder->currentlyAllocated();
}
return result;
} }
return arrayPtr(&*segmentsForOutput.begin(), segmentsForOutput.size());
} }
SegmentReader* BuilderArena::tryGetSegment(SegmentId id) { SegmentReader* BuilderArena::tryGetSegment(SegmentId id) {
if (id.value >= segments.size()) { if (id == SegmentId(0)) {
return nullptr; if (segment0.getArena() == nullptr) {
// We haven't allocated any segments yet.
return nullptr;
} else {
return &segment0;
}
} else { } else {
return segments[id.value].get(); if (moreSegments == nullptr || id.value > moreSegments->builders.size()) {
return nullptr;
} else {
return moreSegments->builders[id.value - 1].get();
}
} }
} }
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <vector> #include <vector>
#include <memory> #include <memory>
#include <unordered_map>
#include "macros.h" #include "macros.h"
#include "type-safety.h" #include "type-safety.h"
#include "message.h" #include "message.h"
...@@ -70,15 +71,59 @@ private: ...@@ -70,15 +71,59 @@ private:
CAPNPROTO_DISALLOW_COPY(ReadLimiter); CAPNPROTO_DISALLOW_COPY(ReadLimiter);
}; };
class SegmentReader {
public:
inline SegmentReader(Arena* arena, SegmentId id, ArrayPtr<const word> ptr,
ReadLimiter* readLimiter);
CAPNPROTO_ALWAYS_INLINE(bool containsInterval(const word* from, const word* to));
inline Arena* getArena();
inline SegmentId getSegmentId();
inline const word* getStartPtr();
inline WordCount getOffsetTo(const word* ptr);
inline WordCount getSize();
inline ArrayPtr<const word> getArray();
private:
Arena* arena;
SegmentId id;
ArrayPtr<const word> ptr;
ReadLimiter* readLimiter;
CAPNPROTO_DISALLOW_COPY(SegmentReader);
friend class SegmentBuilder;
};
class SegmentBuilder: public SegmentReader {
public:
inline SegmentBuilder(BuilderArena* arena, SegmentId id, ArrayPtr<word> ptr,
ReadLimiter* readLimiter);
CAPNPROTO_ALWAYS_INLINE(word* allocate(WordCount amount));
inline word* getPtrUnchecked(WordCount offset);
inline BuilderArena* getArena();
inline WordCount available();
inline ArrayPtr<const word> currentlyAllocated();
private:
word* pos;
CAPNPROTO_DISALLOW_COPY(SegmentBuilder);
// TODO: Do we need mutex locking?
};
class Arena { class Arena {
public: public:
virtual ~Arena(); virtual ~Arena();
virtual ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput() = 0;
// Get an array of all the segments, suitable for writing out. For BuilderArena, this only
// returns the allocated portion of each segment, whereas tryGetSegment() returns something that
// includes not-yet-allocated space.
virtual SegmentReader* tryGetSegment(SegmentId id) = 0; virtual SegmentReader* tryGetSegment(SegmentId id) = 0;
// Gets the segment with the given ID, or return nullptr if no such segment exists. // Gets the segment with the given ID, or return nullptr if no such segment exists.
...@@ -113,27 +158,31 @@ public: ...@@ -113,27 +158,31 @@ public:
class ReaderArena final: public Arena { class ReaderArena final: public Arena {
public: public:
ReaderArena(ArrayPtr<const ArrayPtr<const word>> segments, ErrorReporter* errorReporter, ReaderArena(std::unique_ptr<ReaderContext> context);
WordCount64 readLimit);
~ReaderArena(); ~ReaderArena();
CAPNPROTO_DISALLOW_COPY(ReaderArena);
// implements Arena ------------------------------------------------ // implements Arena ------------------------------------------------
ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput() override;
SegmentReader* tryGetSegment(SegmentId id) override; SegmentReader* tryGetSegment(SegmentId id) override;
void reportInvalidData(const char* description) override; void reportInvalidData(const char* description) override;
void reportReadLimitReached() override; void reportReadLimitReached() override;
private: private:
ArrayPtr<const ArrayPtr<const word>> segments; std::unique_ptr<ReaderContext> context;
ErrorReporter* errorReporter;
ReadLimiter readLimiter; ReadLimiter readLimiter;
std::vector<std::unique_ptr<SegmentReader>> segmentReaders;
// Optimize for single-segment messages so that small messages are handled quickly.
SegmentReader segment0;
typedef std::unordered_map<uint, std::unique_ptr<SegmentReader>> SegmentMap;
std::unique_ptr<SegmentMap> moreSegments;
}; };
class BuilderArena final: public Arena { class BuilderArena final: public Arena {
public: public:
BuilderArena(Allocator* allocator); BuilderArena(std::unique_ptr<BuilderContext> context);
~BuilderArena(); ~BuilderArena();
CAPNPROTO_DISALLOW_COPY(BuilderArena);
SegmentBuilder* getSegment(SegmentId id); SegmentBuilder* getSegment(SegmentId id);
// Get the segment with the given id. Crashes or throws an exception if no such segment exists. // Get the segment with the given id. Crashes or throws an exception if no such segment exists.
...@@ -142,67 +191,30 @@ public: ...@@ -142,67 +191,30 @@ public:
// Get a segment which has at least the given amount of space available, allocating it if // Get a segment which has at least the given amount of space available, allocating it if
// necessary. Crashes or throws an exception if there is not enough memory. // necessary. Crashes or throws an exception if there is not enough memory.
ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput();
// Get an array of all the segments, suitable for writing out. This only returns the allocated
// portion of each segment, whereas tryGetSegment() returns something that includes
// not-yet-allocated space.
// TODO: Methods to deal with bundled capabilities. // TODO: Methods to deal with bundled capabilities.
// implements Arena ------------------------------------------------ // implements Arena ------------------------------------------------
ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput() override;
SegmentReader* tryGetSegment(SegmentId id) override; SegmentReader* tryGetSegment(SegmentId id) override;
void reportInvalidData(const char* description) override; void reportInvalidData(const char* description) override;
void reportReadLimitReached() override; void reportReadLimitReached() override;
private: private:
Allocator* allocator; std::unique_ptr<BuilderContext> context;
std::vector<std::unique_ptr<SegmentBuilder>> segments;
std::vector<ArrayPtr<word>> memory;
std::vector<ArrayPtr<const word>> segmentsForOutput;
ReadLimiter dummyLimiter; ReadLimiter dummyLimiter;
};
class SegmentReader {
public:
inline SegmentReader(Arena* arena, SegmentId id, ArrayPtr<const word> ptr,
ReadLimiter* readLimiter);
CAPNPROTO_ALWAYS_INLINE(bool containsInterval(const word* from, const word* to));
inline Arena* getArena();
inline SegmentId getSegmentId();
inline const word* getStartPtr();
inline WordCount getOffsetTo(const word* ptr);
inline WordCount getSize();
private:
Arena* arena;
SegmentId id;
ArrayPtr<const word> ptr;
ReadLimiter* readLimiter;
CAPNPROTO_DISALLOW_COPY(SegmentReader); SegmentBuilder segment0;
ArrayPtr<const word> segment0ForOutput;
friend class SegmentBuilder;
};
class SegmentBuilder: public SegmentReader {
public:
inline SegmentBuilder(BuilderArena* arena, SegmentId id, ArrayPtr<word> ptr,
ReadLimiter* readLimiter);
CAPNPROTO_ALWAYS_INLINE(word* allocate(WordCount amount));
inline word* getPtrUnchecked(WordCount offset);
inline BuilderArena* getArena(); struct MultiSegmentState {
std::vector<std::unique_ptr<SegmentBuilder>> builders;
inline WordCount available(); std::vector<ArrayPtr<const word>> forOutput;
};
inline ArrayPtr<const word> currentlyAllocated(); std::unique_ptr<MultiSegmentState> moreSegments;
private:
word* pos;
CAPNPROTO_DISALLOW_COPY(SegmentBuilder);
// TODO: Do we need mutex locking?
}; };
// ======================================================================================= // =======================================================================================
...@@ -241,6 +253,7 @@ inline WordCount SegmentReader::getOffsetTo(const word* ptr) { ...@@ -241,6 +253,7 @@ inline WordCount SegmentReader::getOffsetTo(const word* ptr) {
return intervalLength(this->ptr.begin(), ptr); return intervalLength(this->ptr.begin(), ptr);
} }
inline WordCount SegmentReader::getSize() { return ptr.size() * WORDS; } inline WordCount SegmentReader::getSize() { return ptr.size() * WORDS; }
inline ArrayPtr<const word> SegmentReader::getArray() { return ptr; }
// ------------------------------------------------------------------- // -------------------------------------------------------------------
......
...@@ -233,8 +233,7 @@ TEST(Encoding, AllTypes) { ...@@ -233,8 +233,7 @@ TEST(Encoding, AllTypes) {
checkMessage(builder.getRoot()); checkMessage(builder.getRoot());
checkMessage(builder.getRoot().asReader()); checkMessage(builder.getRoot().asReader());
Message<TestAllTypes>::Reader reader( Message<TestAllTypes>::Reader reader(builder.getSegmentsForOutput());
builder.getSegmentsForOutput(), 64, 1 << 30, ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
...@@ -244,15 +243,13 @@ TEST(Encoding, AllTypes) { ...@@ -244,15 +243,13 @@ TEST(Encoding, AllTypes) {
} }
TEST(Encoding, AllTypesMultiSegment) { TEST(Encoding, AllTypesMultiSegment) {
MallocAllocator allocator(0); Message<TestAllTypes>::Builder builder(newFixedWidthBuilderContext(0));
Message<TestAllTypes>::Builder builder(&allocator);
initMessage(builder.initRoot()); initMessage(builder.initRoot());
checkMessage(builder.getRoot()); checkMessage(builder.getRoot());
checkMessage(builder.getRoot().asReader()); checkMessage(builder.getRoot().asReader());
Message<TestAllTypes>::Reader reader( Message<TestAllTypes>::Reader reader(builder.getSegmentsForOutput());
builder.getSegmentsForOutput(), 64, 1 << 30, ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
} }
...@@ -260,8 +257,7 @@ TEST(Encoding, AllTypesMultiSegment) { ...@@ -260,8 +257,7 @@ TEST(Encoding, AllTypesMultiSegment) {
TEST(Encoding, Defaults) { TEST(Encoding, Defaults) {
AlignedData<1> nullRoot = {{0, 0, 0, 0, 0, 0, 0, 0}}; AlignedData<1> nullRoot = {{0, 0, 0, 0, 0, 0, 0, 0}};
ArrayPtr<const word> segments[1] = {arrayPtr(nullRoot.words, 1)}; ArrayPtr<const word> segments[1] = {arrayPtr(nullRoot.words, 1)};
Message<TestDefaults>::Reader reader(arrayPtr(segments, 1), 64, 1 << 30, Message<TestDefaults>::Reader reader(arrayPtr(segments, 1));
ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
checkMessage(Message<TestDefaults>::readTrusted(nullRoot.words)); checkMessage(Message<TestDefaults>::readTrusted(nullRoot.words));
...@@ -276,15 +272,13 @@ TEST(Encoding, DefaultInitialization) { ...@@ -276,15 +272,13 @@ TEST(Encoding, DefaultInitialization) {
checkMessage(builder.getRoot()); // second pass just reads the initialized structure checkMessage(builder.getRoot()); // second pass just reads the initialized structure
checkMessage(builder.getRoot().asReader()); checkMessage(builder.getRoot().asReader());
Message<TestDefaults>::Reader reader( Message<TestDefaults>::Reader reader(builder.getSegmentsForOutput());
builder.getSegmentsForOutput(), 64, 1 << 30, ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
} }
TEST(Encoding, DefaultInitializationMultiSegment) { TEST(Encoding, DefaultInitializationMultiSegment) {
MallocAllocator allocator(0); Message<TestDefaults>::Builder builder(newFixedWidthBuilderContext(0));
Message<TestDefaults>::Builder builder(&allocator);
checkMessage(builder.getRoot()); // first pass initializes to defaults checkMessage(builder.getRoot()); // first pass initializes to defaults
checkMessage(builder.getRoot().asReader()); checkMessage(builder.getRoot().asReader());
...@@ -292,8 +286,7 @@ TEST(Encoding, DefaultInitializationMultiSegment) { ...@@ -292,8 +286,7 @@ TEST(Encoding, DefaultInitializationMultiSegment) {
checkMessage(builder.getRoot()); // second pass just reads the initialized structure checkMessage(builder.getRoot()); // second pass just reads the initialized structure
checkMessage(builder.getRoot().asReader()); checkMessage(builder.getRoot().asReader());
Message<TestDefaults>::Reader reader( Message<TestDefaults>::Reader reader(builder.getSegmentsForOutput());
builder.getSegmentsForOutput(), 64, 1 << 30, ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
} }
...@@ -302,8 +295,7 @@ TEST(Encoding, DefaultsFromEmptyMessage) { ...@@ -302,8 +295,7 @@ TEST(Encoding, DefaultsFromEmptyMessage) {
AlignedData<1> emptyMessage = {{4, 0, 0, 0, 0, 0, 0, 0}}; AlignedData<1> emptyMessage = {{4, 0, 0, 0, 0, 0, 0, 0}};
ArrayPtr<const word> segments[1] = {arrayPtr(emptyMessage.words, 1)}; ArrayPtr<const word> segments[1] = {arrayPtr(emptyMessage.words, 1)};
Message<TestDefaults>::Reader reader(arrayPtr(segments, 1), 64, 1 << 30, Message<TestDefaults>::Reader reader(arrayPtr(segments, 1));
ThrowingErrorReporter::getDefaultInstance());
checkMessage(reader.getRoot()); checkMessage(reader.getRoot());
checkMessage(Message<TestDefaults>::readTrusted(emptyMessage.words)); checkMessage(Message<TestDefaults>::readTrusted(emptyMessage.words));
......
...@@ -75,9 +75,6 @@ template <typename T, bool b> struct FieldSizeForType<List<T, b>> { ...@@ -75,9 +75,6 @@ template <typename T, bool b> struct FieldSizeForType<List<T, b>> {
static constexpr FieldSize value = FieldSize::REFERENCE; static constexpr FieldSize value = FieldSize::REFERENCE;
}; };
template<typename T> constexpr T&& move(T& t) noexcept { return static_cast<T&&>(t); }
// Like std::move. Unfortunately, #including <utility> brings in tons of unnecessary stuff.
template <typename T> template <typename T>
class TemporaryPointer { class TemporaryPointer {
// This class is a little hack which lets us define operator->() in cases where it needs to // This class is a little hack which lets us define operator->() in cases where it needs to
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <exception> #include <exception>
#include <string> #include <string>
#include <unistd.h> #include <unistd.h>
#include <stdio.h>
namespace capnproto { namespace capnproto {
namespace internal { namespace internal {
...@@ -47,7 +48,9 @@ Exception::Exception( ...@@ -47,7 +48,9 @@ Exception::Exception(
description = "Captain Proto debug assertion failed:\n "; description = "Captain Proto debug assertion failed:\n ";
description += file; description += file;
description += ':'; description += ':';
description += line; char buf[32];
sprintf(buf, "%d", line);
description += buf;
description += ": "; description += ": ";
description += expectation; description += expectation;
description += "\n "; description += "\n ";
......
...@@ -37,15 +37,15 @@ ...@@ -37,15 +37,15 @@
#include "wire-format.h" #include "wire-format.h"
namespace capnproto { namespace capnproto {
class Allocator; class ReaderContext;
class ErrorReporter; class BuilderContext;
} }
namespace capnproto { namespace capnproto {
namespace internal { namespace internal {
// TODO: Move to message-internal.h so that this header looks nicer? // TODO: Move to message-internal.h so that this header looks nicer?
class Arena; class ReaderArena;
class BuilderArena; class BuilderArena;
struct MessageImpl { struct MessageImpl {
...@@ -56,8 +56,8 @@ struct MessageImpl { ...@@ -56,8 +56,8 @@ struct MessageImpl {
class Reader { class Reader {
public: public:
Reader(ArrayPtr<const ArrayPtr<const word>> segments, Reader(ArrayPtr<const ArrayPtr<const word>> segments);
uint recursionLimit, uint64_t readLimit, ErrorReporter* errorReporter); Reader(std::unique_ptr<ReaderContext> context);
Reader(Reader&& other) = default; Reader(Reader&& other) = default;
CAPNPROTO_DISALLOW_COPY(Reader); CAPNPROTO_DISALLOW_COPY(Reader);
~Reader(); ~Reader();
...@@ -65,15 +65,22 @@ struct MessageImpl { ...@@ -65,15 +65,22 @@ struct MessageImpl {
StructReader getRoot(const word* defaultValue); StructReader getRoot(const word* defaultValue);
private: private:
std::unique_ptr<Arena> arena;
uint recursionLimit; uint recursionLimit;
// Space in which we can construct a ReaderArena. We don't use ReaderArena directly here
// because we don't want clients to have to #include arena.h, which itself includes a bunch of
// big STL headers. We don't use a pointer to a ReaderArena because that would require an
// extra malloc on every message which could be expensive when processing small messages,
// particularly when the context itself is freelisted and so no other allocation is necessary.
void* arenaSpace[15];
ReaderArena* arena() { return reinterpret_cast<ReaderArena*>(arenaSpace); }
}; };
class Builder { class Builder {
public: public:
Builder(); Builder();
Builder(Allocator* allocator); Builder(std::unique_ptr<BuilderContext> context);
Builder(Builder&& other) = default; Builder(Builder&& other) = default;
CAPNPROTO_DISALLOW_COPY(Builder); CAPNPROTO_DISALLOW_COPY(Builder);
~Builder(); ~Builder();
...@@ -84,9 +91,17 @@ struct MessageImpl { ...@@ -84,9 +91,17 @@ struct MessageImpl {
ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput(); ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput();
private: private:
std::unique_ptr<BuilderArena> arena;
SegmentBuilder* rootSegment; SegmentBuilder* rootSegment;
// Space in which we can construct a BuilderArena. We don't use BuilderArena directly here
// because we don't want clients to have to #include arena.h, which itself includes a bunch of
// big STL headers. We don't use a pointer to a BuilderArena because that would require an
// extra malloc on every message which could be expensive when processing small messages,
// particularly when the context itself is freelisted and so no other allocation is necessary.
void* arenaSpace[15];
BuilderArena* arena() { return reinterpret_cast<BuilderArena*>(arenaSpace); }
static SegmentBuilder* allocateRoot(BuilderArena* arena); static SegmentBuilder* allocateRoot(BuilderArena* arena);
}; };
}; };
......
...@@ -26,87 +26,160 @@ ...@@ -26,87 +26,160 @@
#include "stdlib.h" #include "stdlib.h"
#include <exception> #include <exception>
#include <string> #include <string>
#include <vector>
#include <unistd.h> #include <unistd.h>
namespace capnproto { namespace capnproto {
Allocator::~Allocator() {} ReaderContext::~ReaderContext() {}
ErrorReporter::~ErrorReporter() {} BuilderContext::~BuilderContext() {}
MallocAllocator::MallocAllocator(uint preferredSegmentSizeWords) class ParseException: public std::exception {
: preferredSegmentSizeWords(preferredSegmentSizeWords) {} public:
MallocAllocator::~MallocAllocator() {} ParseException(std::string message)
: message(message) {}
~ParseException() noexcept {}
MallocAllocator* MallocAllocator::getDefaultInstance() { const char* what() const noexcept override {
static MallocAllocator defaultInstance(1024); return message.c_str();
return &defaultInstance; }
}
ArrayPtr<word> MallocAllocator::allocate(SegmentId id, uint minimumSize) { private:
uint size = std::max(minimumSize, preferredSegmentSizeWords); std::string message;
return arrayPtr(reinterpret_cast<word*>(calloc(size, sizeof(word))), size); };
}
void MallocAllocator::free(SegmentId id, ArrayPtr<word> ptr) { class DefaultReaderContext: public ReaderContext {
::free(ptr.begin()); public:
} DefaultReaderContext(ArrayPtr<const ArrayPtr<const word>> segments,
ErrorBehavior errorBehavior, uint64_t readLimit, uint nestingLimit)
: segments(segments), errorBehavior(errorBehavior), readLimit(readLimit),
nestingLimit(nestingLimit) {}
~DefaultReaderContext() {}
ArrayPtr<const word> getSegment(uint id) override {
if (id < segments.size()) {
return segments[id];
} else {
return nullptr;
}
}
StderrErrorReporter::~StderrErrorReporter() {} uint64_t getReadLimit() override {
return readLimit;
}
StderrErrorReporter* StderrErrorReporter::getDefaultInstance() { uint getNestingLimit() override {
static StderrErrorReporter defaultInstance; return nestingLimit;
return &defaultInstance; }
}
void reportError(const char* description) override {
std::string message("ERROR: Cap'n Proto parse error: ");
message += description;
message += '\n';
switch (errorBehavior) {
case ErrorBehavior::THROW_EXCEPTION:
throw ParseException(std::move(message));
break;
case ErrorBehavior::REPORT_TO_STDERR_AND_RETURN_DEFAULT:
write(STDERR_FILENO, message.data(), message.size());
break;
case ErrorBehavior::IGNORE_AND_RETURN_DEFAULT:
break;
}
}
void StderrErrorReporter::reportError(const char* description) { private:
std::string message("ERROR: Cap'n Proto parse error: "); ArrayPtr<const ArrayPtr<const word>> segments;
message += description; ErrorBehavior errorBehavior;
message += '\n'; uint64_t readLimit;
write(STDERR_FILENO, message.data(), message.size()); uint nestingLimit;
};
std::unique_ptr<ReaderContext> newReaderContext(
ArrayPtr<const ArrayPtr<const word>> segments,
ErrorBehavior errorBehavior, uint64_t readLimit, uint nestingLimit) {
return std::unique_ptr<ReaderContext>(new DefaultReaderContext(
segments, errorBehavior, readLimit, nestingLimit));
} }
class ParseException: public std::exception { class DefaultBuilderContext: public BuilderContext {
public: public:
ParseException(const char* description); DefaultBuilderContext(uint firstSegmentWords, bool enableGrowthHeursitic)
~ParseException() noexcept; : nextSize(firstSegmentWords), enableGrowthHeursitic(enableGrowthHeursitic),
firstSegment(nullptr) {}
~DefaultBuilderContext() {
free(firstSegment);
for (void* ptr: moreSegments) {
free(ptr);
}
}
const char* what() const noexcept override; ArrayPtr<word> allocateSegment(uint minimumSize) override {
uint size = std::max(minimumSize, nextSize);
private: void* result = calloc(size, sizeof(word));
std::string description; if (result == nullptr) {
}; throw std::bad_alloc();
}
ParseException::ParseException(const char* description) if (firstSegment == nullptr) {
: description(description) {} firstSegment = result;
if (enableGrowthHeursitic) nextSize = size;
} else {
moreSegments.push_back(result);
if (enableGrowthHeursitic) nextSize += size;
}
ParseException::~ParseException() noexcept {} return arrayPtr(reinterpret_cast<word*>(result), size);
}
const char* ParseException::what() const noexcept { private:
return description.c_str(); uint nextSize;
} bool enableGrowthHeursitic;
ThrowingErrorReporter::~ThrowingErrorReporter() {} // Avoid allocating the vector if there is only one segment.
void* firstSegment;
std::vector<void*> moreSegments;
};
ThrowingErrorReporter* ThrowingErrorReporter::getDefaultInstance() { std::unique_ptr<BuilderContext> newBuilderContext(uint firstSegmentWords) {
static ThrowingErrorReporter defaultInstance; return std::unique_ptr<BuilderContext>(new DefaultBuilderContext(firstSegmentWords, true));
return &defaultInstance;
} }
void ThrowingErrorReporter::reportError(const char* description) { std::unique_ptr<BuilderContext> newFixedWidthBuilderContext(uint firstSegmentWords) {
throw ParseException(description); return std::unique_ptr<BuilderContext>(new DefaultBuilderContext(firstSegmentWords, false));
} }
// ======================================================================================= // =======================================================================================
namespace internal { namespace internal {
MessageImpl::Reader::Reader(ArrayPtr<const ArrayPtr<const word>> segments, MessageImpl::Reader::Reader(ArrayPtr<const ArrayPtr<const word>> segments) {
uint recursionLimit, uint64_t readLimit, ErrorReporter* errorReporter) std::unique_ptr<ReaderContext> context = newReaderContext(segments);
: arena(new ReaderArena(segments, errorReporter, readLimit * WORDS)), recursionLimit = context->getNestingLimit();
recursionLimit(recursionLimit) {}
MessageImpl::Reader::~Reader() {} static_assert(sizeof(ReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a ReaderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) ReaderArena(std::move(context));
}
MessageImpl::Reader::Reader(std::unique_ptr<ReaderContext> context)
: recursionLimit(context->getNestingLimit()) {
static_assert(sizeof(ReaderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a ReaderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) ReaderArena(std::move(context));
}
MessageImpl::Reader::~Reader() {
arena()->~ReaderArena();
}
StructReader MessageImpl::Reader::getRoot(const word* defaultValue) { StructReader MessageImpl::Reader::getRoot(const word* defaultValue) {
SegmentReader* segment = arena->tryGetSegment(SegmentId(0)); SegmentReader* segment = arena()->tryGetSegment(SegmentId(0));
if (segment == nullptr || if (segment == nullptr ||
!segment->containsInterval(segment->getStartPtr(), segment->getStartPtr() + 1)) { !segment->containsInterval(segment->getStartPtr(), segment->getStartPtr() + 1)) {
segment->getArena()->reportInvalidData("Message did not contain a root pointer."); segment->getArena()->reportInvalidData("Message did not contain a root pointer.");
...@@ -116,25 +189,39 @@ StructReader MessageImpl::Reader::getRoot(const word* defaultValue) { ...@@ -116,25 +189,39 @@ StructReader MessageImpl::Reader::getRoot(const word* defaultValue) {
} }
} }
MessageImpl::Builder::Builder() MessageImpl::Builder::Builder(): rootSegment(nullptr) {
: arena(new BuilderArena(MallocAllocator::getDefaultInstance())), std::unique_ptr<BuilderContext> context = newBuilderContext();
rootSegment(allocateRoot(arena.get())) {}
MessageImpl::Builder::Builder(Allocator* allocator) static_assert(sizeof(BuilderArena) <= sizeof(arenaSpace),
: arena(new BuilderArena(allocator)), "arenaSpace is too small to hold a BuilderArena. Please increase it. This will break "
rootSegment(allocateRoot(arena.get())) {} "ABI compatibility.");
MessageImpl::Builder::~Builder() {} new(arena()) BuilderArena(std::move(context));
}
MessageImpl::Builder::Builder(std::unique_ptr<BuilderContext> context): rootSegment(nullptr) {
static_assert(sizeof(BuilderArena) <= sizeof(arenaSpace),
"arenaSpace is too small to hold a BuilderArena. Please increase it. This will break "
"ABI compatibility.");
new(arena()) BuilderArena(std::move(context));
}
MessageImpl::Builder::~Builder() {
arena()->~BuilderArena();
}
StructBuilder MessageImpl::Builder::initRoot(const word* defaultValue) { StructBuilder MessageImpl::Builder::initRoot(const word* defaultValue) {
if (rootSegment == nullptr) rootSegment = allocateRoot(arena());
return StructBuilder::initRoot( return StructBuilder::initRoot(
rootSegment, rootSegment->getPtrUnchecked(0 * WORDS), defaultValue); rootSegment, rootSegment->getPtrUnchecked(0 * WORDS), defaultValue);
} }
StructBuilder MessageImpl::Builder::getRoot(const word* defaultValue) { StructBuilder MessageImpl::Builder::getRoot(const word* defaultValue) {
if (rootSegment == nullptr) rootSegment = allocateRoot(arena());
return StructBuilder::getRoot(rootSegment, rootSegment->getPtrUnchecked(0 * WORDS), defaultValue); return StructBuilder::getRoot(rootSegment, rootSegment->getPtrUnchecked(0 * WORDS), defaultValue);
} }
ArrayPtr<const ArrayPtr<const word>> MessageImpl::Builder::getSegmentsForOutput() { ArrayPtr<const ArrayPtr<const word>> MessageImpl::Builder::getSegmentsForOutput() {
return arena->getSegmentsForOutput(); return arena()->getSegmentsForOutput();
} }
SegmentBuilder* MessageImpl::Builder::allocateRoot(BuilderArena* arena) { SegmentBuilder* MessageImpl::Builder::allocateRoot(BuilderArena* arena) {
......
...@@ -38,21 +38,66 @@ typedef Id<uint32_t, Segment> SegmentId; ...@@ -38,21 +38,66 @@ typedef Id<uint32_t, Segment> SegmentId;
// ======================================================================================= // =======================================================================================
class Allocator { class ReaderContext {
public: public:
virtual ~Allocator(); virtual ~ReaderContext();
virtual ArrayPtr<word> allocate(SegmentId id, uint minimumSize) = 0; virtual ArrayPtr<const word> getSegment(uint id) = 0;
virtual void free(SegmentId id, ArrayPtr<word> ptr) = 0; // Gets the segment with the given ID, or returns null if no such segment exists.
virtual uint64_t getReadLimit() = 0;
virtual uint getNestingLimit() = 0;
virtual void reportError(const char* description) = 0;
};
enum class ErrorBehavior {
THROW_EXCEPTION,
REPORT_TO_STDERR_AND_RETURN_DEFAULT,
IGNORE_AND_RETURN_DEFAULT
}; };
class ErrorReporter { std::unique_ptr<ReaderContext> newReaderContext(
ArrayPtr<const ArrayPtr<const word>> segments,
ErrorBehavior errorBehavior = ErrorBehavior::THROW_EXCEPTION,
uint64_t readLimit = 64 * 1024 * 1024, uint nestingLimit = 64);
// Creates a ReaderContext pointing at the given segment list, without taking ownership of the
// segments. All arrays passed in must remain valid until the context is destroyed.
class BuilderContext {
public: public:
virtual ~ErrorReporter(); virtual ~BuilderContext();
virtual void reportError(const char* description) = 0; virtual ArrayPtr<word> allocateSegment(uint minimumSize) = 0;
// Allocates an array of at least the given number of words, throwing an exception or crashing if
// this is not possible. It is expected that this method will usually return more space than
// requested, and the caller should use that extra space as much as possible before allocating
// more. All returned space is deleted when the context is destroyed.
}; };
std::unique_ptr<BuilderContext> newBuilderContext(uint firstSegmentWords = 1024);
// Creates a BuilderContext which allocates at least the given number of words for the first
// segment, and then heuristically decides how much to allocate for subsequent segments. This
// should work well for most use cases that do not require writing messages to specific locations
// in memory. When choosing a value for firstSegmentWords, consider that:
// 1) Reading and writing messages gets slower when multiple segments are involved, so it's good
// if most messages fit in a single segment.
// 2) Unused bytes will not be written to the wire, so generally it is not a big deal to allocate
// more space than you need. It only becomes problematic if you are allocating many messages
// in parallel and thus use lots of memory, or if you allocate so much extra space that just
// zeroing it out becomes a bottleneck.
// The default has been chosen to be reasonable for most people, so don't change it unless you have
// reason to believe you need to.
std::unique_ptr<BuilderContext> newFixedWidthBuilderContext(uint preferredSegmentWords = 1024);
// Creates a BuilderContext which will always prefer to allocate segments with the given size with
// no heuristic growth. It will still allocate larger segments when the preferred size is too small
// for some single object. You can force every single object to be located in a separate segment by
// passing zero for the parameter to this function, but this isn't a good idea. This context
// implementation is probably most useful for testing purposes, where you want to verify that your
// serializer works when a message is split across segments and you want those segments to be
// somewhat predictable.
// ======================================================================================= // =======================================================================================
template <typename RootType> template <typename RootType>
...@@ -61,12 +106,17 @@ struct Message { ...@@ -61,12 +106,17 @@ struct Message {
class Reader { class Reader {
public: public:
Reader(ArrayPtr<const ArrayPtr<const word>> segments, Reader(ArrayPtr<const ArrayPtr<const word>> segments);
uint recursionLimit, uint64_t readLimit, ErrorReporter* errorReporter); // Make a Reader that reads from the given segments, as if the context were created using
Reader(Reader&& other) = default; // newReaderContext(segments).
Reader(std::unique_ptr<ReaderContext> context);
CAPNPROTO_DISALLOW_COPY(Reader); CAPNPROTO_DISALLOW_COPY(Reader);
Reader(Reader&& other) = default;
typename RootType::Reader getRoot(); typename RootType::Reader getRoot();
// Get a reader pointing to the message root.
private: private:
internal::MessageImpl::Reader internal; internal::MessageImpl::Reader internal;
...@@ -75,16 +125,19 @@ struct Message { ...@@ -75,16 +125,19 @@ struct Message {
class Builder { class Builder {
public: public:
Builder(); Builder();
// Make a Builder that allocates using malloc, using the default segment size. // Make a Builder as if with a context created by newBuilderContext().
Builder(Allocator* allocator); Builder(std::unique_ptr<BuilderContext> context);
// Make a Builder that allocates memory using the given allocator.
Builder(Builder&& other) = default;
CAPNPROTO_DISALLOW_COPY(Builder); CAPNPROTO_DISALLOW_COPY(Builder);
Builder(Builder&& other) = default;
typename RootType::Builder initRoot(); typename RootType::Builder initRoot();
// Allocate and initialize the message root. If already initialized, the old data is discarded.
typename RootType::Builder getRoot(); typename RootType::Builder getRoot();
// Get the message root, initializing it to the type's default value if it isn't initialized
// already.
ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput(); ArrayPtr<const ArrayPtr<const word>> getSegmentsForOutput();
...@@ -115,51 +168,16 @@ struct Message { ...@@ -115,51 +168,16 @@ struct Message {
// MyMessage::Reader reader = Message<MyMessage>::ReadTrusted(MyMessage::DEFAULT.words); // MyMessage::Reader reader = Message<MyMessage>::ReadTrusted(MyMessage::DEFAULT.words);
}; };
// =======================================================================================
// Standard implementations of allocators and error reporters.
class MallocAllocator: public Allocator {
public:
explicit MallocAllocator(uint preferredSegmentSizeWords);
~MallocAllocator();
static MallocAllocator* getDefaultInstance();
// implements Allocator --------------------------------------------
ArrayPtr<word> allocate(SegmentId id, uint minimumSize) override;
void free(SegmentId id, ArrayPtr<word> ptr) override;
private:
uint preferredSegmentSizeWords;
};
class StderrErrorReporter: public ErrorReporter {
public:
~StderrErrorReporter();
static StderrErrorReporter* getDefaultInstance();
// implements ErrorReporter ----------------------------------------
void reportError(const char* description) override;
};
class ThrowingErrorReporter: public ErrorReporter {
public:
~ThrowingErrorReporter();
static ThrowingErrorReporter* getDefaultInstance();
// implements ErrorReporter ----------------------------------------
void reportError(const char* description) override;
};
// ======================================================================================= // =======================================================================================
// implementation details // implementation details
template <typename RootType> template <typename RootType>
inline Message<RootType>::Reader::Reader(ArrayPtr<const ArrayPtr<const word>> segments, inline Message<RootType>::Reader::Reader(ArrayPtr<const ArrayPtr<const word>> segments)
uint recursionLimit, uint64_t readLimit, ErrorReporter* errorReporter) : internal(segments) {}
: internal(segments, recursionLimit, readLimit, errorReporter) {}
template <typename RootType>
inline Message<RootType>::Reader::Reader(std::unique_ptr<ReaderContext> context)
: internal(std::move(context)) {}
template <typename RootType> template <typename RootType>
inline typename RootType::Reader Message<RootType>::Reader::getRoot() { inline typename RootType::Reader Message<RootType>::Reader::getRoot() {
...@@ -171,8 +189,8 @@ inline Message<RootType>::Builder::Builder() ...@@ -171,8 +189,8 @@ inline Message<RootType>::Builder::Builder()
: internal() {} : internal() {}
template <typename RootType> template <typename RootType>
inline Message<RootType>::Builder::Builder(Allocator* allocator) inline Message<RootType>::Builder::Builder(std::unique_ptr<BuilderContext> context)
: internal(allocator) {} : internal(std::move(context)) {}
template <typename RootType> template <typename RootType>
inline typename RootType::Builder Message<RootType>::Builder::initRoot() { inline typename RootType::Builder Message<RootType>::Builder::initRoot() {
......
...@@ -51,6 +51,9 @@ struct NoInfer { ...@@ -51,6 +51,9 @@ struct NoInfer {
typedef T Type; typedef T Type;
}; };
template<typename T> constexpr T&& move(T& t) noexcept { return static_cast<T&&>(t); }
// Like std::move. Unfortunately, #including <utility> brings in tons of unnecessary stuff.
// ======================================================================================= // =======================================================================================
// ArrayPtr // ArrayPtr
...@@ -83,6 +86,9 @@ public: ...@@ -83,6 +86,9 @@ public:
return ArrayPtr(ptr + start, end - start); return ArrayPtr(ptr + start, end - start);
} }
inline bool operator==(std::nullptr_t) { return ptr == nullptr; }
inline bool operator!=(std::nullptr_t) { return ptr != nullptr; }
private: private:
T* ptr; T* ptr;
std::size_t size_; std::size_t size_;
......
...@@ -264,7 +264,7 @@ static void checkStruct(StructReader reader) { ...@@ -264,7 +264,7 @@ static void checkStruct(StructReader reader) {
} }
TEST(WireFormat, StructRoundTrip_OneSegment) { TEST(WireFormat, StructRoundTrip_OneSegment) {
BuilderArena arena(MallocAllocator::getDefaultInstance()); BuilderArena arena(newBuilderContext());
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS); SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS); word* rootLocation = segment->allocate(1 * WORDS);
...@@ -298,8 +298,7 @@ TEST(WireFormat, StructRoundTrip_OneSegment) { ...@@ -298,8 +298,7 @@ TEST(WireFormat, StructRoundTrip_OneSegment) {
} }
TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) { TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
MallocAllocator allocator(1); BuilderArena arena(newFixedWidthBuilderContext(0));
BuilderArena arena(&allocator);
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS); SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS); word* rootLocation = segment->allocate(1 * WORDS);
...@@ -334,8 +333,7 @@ TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) { ...@@ -334,8 +333,7 @@ TEST(WireFormat, StructRoundTrip_OneSegmentPerAllocation) {
} }
TEST(WireFormat, StructRoundTrip_MultipleSegmentsWithMultipleAllocations) { TEST(WireFormat, StructRoundTrip_MultipleSegmentsWithMultipleAllocations) {
MallocAllocator allocator(8); BuilderArena arena(newFixedWidthBuilderContext(8));
BuilderArena arena(&allocator);
SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS); SegmentBuilder* segment = arena.getSegmentWithAvailable(1 * WORDS);
word* rootLocation = segment->allocate(1 * WORDS); word* rootLocation = segment->allocate(1 * WORDS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment