Commit c5bed0d2 authored by Kenton Varda's avatar Kenton Varda

Eliminate the ability to have multiple threads working on building the same…

Eliminate the ability to have multiple threads working on building the same message -- performance penalty is too large, and applies even to single-threaded users.
parent e422fc1d
......@@ -181,8 +181,7 @@ SegmentBuilder* BasicBuilderArena::getSegment(SegmentId id) {
if (id == SegmentId(0)) {
return &segment0;
} else {
auto lock = moreSegments.lockShared();
KJ_IF_MAYBE(s, *lock) {
KJ_IF_MAYBE(s, moreSegments) {
KJ_REQUIRE(id.value - 1 < s->get()->builders.size(), "invalid segment id", id.value);
// TODO(cleanup): Return a const SegmentBuilder and tediously constify all SegmentBuilder
// pointers throughout the codebase.
......@@ -213,9 +212,8 @@ BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount)
// Need to fall back to additional segments.
auto lock = moreSegments.lockExclusive();
MultiSegmentState* segmentState;
KJ_IF_MAYBE(s, *lock) {
KJ_IF_MAYBE(s, moreSegments) {
// TODO(perf): Check for available space in more than just the last segment. We don't
// want this to be O(n), though, so we'll need to maintain some sort of table. Complicating
// matters, we want SegmentBuilders::allocate() to be fast, so we can't update any such
......@@ -231,7 +229,7 @@ BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount)
} else {
auto newSegmentState = kj::heap<MultiSegmentState>();
segmentState = newSegmentState;
*lock = kj::mv(newSegmentState);
moreSegments = kj::mv(newSegmentState);
}
kj::Own<BasicSegmentBuilder> newBuilder = kj::heap<BasicSegmentBuilder>(
......@@ -251,12 +249,12 @@ BasicBuilderArena::AllocateResult BasicBuilderArena::allocate(WordCount amount)
}
kj::ArrayPtr<const kj::ArrayPtr<const word>> BasicBuilderArena::getSegmentsForOutput() {
// We shouldn't need to lock a mutex here because if this is called multiple times simultaneously,
// we should only be overwriting the array with the exact same data. If the number or size of
// segments is actually changing due to an activity in another thread, then the caller has a
// problem regardless of locking here.
// Although this is a read-only method, we shouldn't need to lock a mutex here because if this
// is called multiple times simultaneously, we should only be overwriting the array with the
// exact same data. If the number or size of segments is actually changing due to an activity
// in another thread, then the caller has a problem regardless of locking here.
KJ_IF_MAYBE(segmentState, moreSegments.getWithoutLock()) {
KJ_IF_MAYBE(segmentState, moreSegments) {
KJ_DASSERT(segmentState->get()->forOutput.size() == segmentState->get()->builders.size() + 1,
"segmentState->forOutput wasn't resized correctly when the last builder was added.",
segmentState->get()->forOutput.size(), segmentState->get()->builders.size());
......@@ -290,8 +288,7 @@ SegmentReader* BasicBuilderArena::tryGetSegment(SegmentId id) {
return &segment0;
}
} else {
auto lock = moreSegments.lockShared();
KJ_IF_MAYBE(segmentState, *lock) {
KJ_IF_MAYBE(segmentState, moreSegments) {
if (id.value <= segmentState->get()->builders.size()) {
// TODO(cleanup): Return a const SegmentReader and tediously constify all SegmentBuilder
// pointers throughout the codebase.
......@@ -346,14 +343,13 @@ SegmentBuilder* ImbuedBuilderArena::imbue(SegmentBuilder* baseSegment) {
}
result = &segment0;
} else {
auto lock = moreSegments.lockExclusive();
MultiSegmentState* segmentState;
KJ_IF_MAYBE(s, *lock) {
KJ_IF_MAYBE(s, moreSegments) {
segmentState = *s;
} else {
auto newState = kj::heap<MultiSegmentState>();
segmentState = newState;
*lock = kj::mv(newState);
moreSegments = kj::mv(newState);
}
auto id = baseSegment->getSegmentId().value;
......
......@@ -232,6 +232,13 @@ private:
typedef std::unordered_map<uint, kj::Own<SegmentReader>> SegmentMap;
kj::MutexGuarded<kj::Maybe<kj::Own<SegmentMap>>> moreSegments;
// We need to mutex-guard the segment map because we lazily initialize segments when they are
// first requested, but a Reader is allowed to be used concurrently in multiple threads. Luckily
// this only applies to large messages.
//
// TODO(perf): Thread-local thing instead? Some kind of lockless map? Or do sharing of data
// in a different way, where you have to construct a new MessageReader in each thread (but
// possibly backed by the same data)?
};
class ImbuedReaderArena final: public Arena {
......@@ -323,7 +330,7 @@ private:
std::vector<kj::Own<BasicSegmentBuilder>> builders;
std::vector<kj::ArrayPtr<const word>> forOutput;
};
kj::MutexGuarded<kj::Maybe<kj::Own<MultiSegmentState>>> moreSegments;
kj::Maybe<kj::Own<MultiSegmentState>> moreSegments;
};
class ImbuedBuilderArena final: public BuilderArena {
......@@ -358,7 +365,7 @@ private:
struct MultiSegmentState {
std::vector<kj::Maybe<kj::Own<ImbuedSegmentBuilder>>> builders;
};
kj::MutexGuarded<kj::Maybe<kj::Own<MultiSegmentState>>> moreSegments;
kj::Maybe<kj::Own<MultiSegmentState>> moreSegments;
};
// =======================================================================================
......@@ -422,22 +429,13 @@ inline SegmentBuilder::SegmentBuilder(
: SegmentReader(arena, id, ptr, readLimiter), pos(pos) {}
inline word* SegmentBuilder::allocate(WordCount amount) {
word* result = __atomic_fetch_add(pos, amount * BYTES_PER_WORD / BYTES, __ATOMIC_RELAXED);
// Careful about pointer arithmetic here. The segment might be at the end of the address space,
// or `amount` could be ridiculously huge.
if (ptr.end() - (result + amount) < 0) {
if (intervalLength(*pos, ptr.end()) < amount) {
// Not enough space in the segment for this allocation.
if (ptr.end() - result >= 0) {
// It was our increment that pushed the pointer past the end of the segment. Therefore no
// other thread could have accidentally allocated space in this segment in the meantime.
// We need to back up the pointer so that it will be correct when the data is written out
// (and also so that another allocation can potentially use the remaining space).
__atomic_store_n(pos, result, __ATOMIC_RELAXED);
}
return nullptr;
} else {
// Success.
word* result = *pos;
*pos = *pos + amount;
return result;
}
}
......
......@@ -153,7 +153,7 @@ public:
private:
CapInjectorBase* injector; // becomes null once arena is allocated
void* arenaSpace[12 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
void* arenaSpace[13];
_::ImbuedBuilderArena& arena() { return *reinterpret_cast<_::ImbuedBuilderArena*>(arenaSpace); }
......
......@@ -24,7 +24,6 @@
#include <capnp/test-import.capnp.h>
#include <capnp/test-import2.capnp.h>
#include "message.h"
#include <kj/thread.h>
#include <kj/debug.h>
#include <gtest/gtest.h>
#include "test-util.h"
......@@ -1497,54 +1496,6 @@ TEST(Encoding, Has) {
EXPECT_TRUE(root.asReader().hasInt32List());
}
TEST(Encoding, Threads) {
// Use fixed-size segments so that many segments are allocated during the test.
MallocMessageBuilder message(1024, AllocationStrategy::FIXED_SIZE);
kj::MutexGuarded<Orphanage> orphanage(message.getOrphanage());
auto outerLock = orphanage.lockExclusive();
auto threadFunc = [&]() {
int dummy;
uint64_t me = reinterpret_cast<uintptr_t>(&dummy);
{
// Make sure all threads start at the same time.
auto lock = orphanage.lockShared();
// Allocate space for a list. This will always end up allocating a new segment.
auto list = lock->newOrphan<List<List<uint64_t>>>(10000);
auto builder = list.get();
// Allocate a bunch of smaller lists and initialize them to values specific to this thread.
for (uint i = 0; i < builder.size(); i++) {
builder.set(i, {me, me + 1, me + 2, me + 3});
}
// Check that none of the values were corrupted.
for (auto item: list.getReader()) {
ASSERT_EQ(4u, item.size());
EXPECT_EQ(me, item[0]);
EXPECT_EQ(me + 1, item[1]);
EXPECT_EQ(me + 2, item[2]);
EXPECT_EQ(me + 3, item[3]);
}
}
};
kj::Thread thread1(threadFunc);
kj::Thread thread2(threadFunc);
kj::Thread thread3(threadFunc);
kj::Thread thread4(threadFunc);
usleep(10000);
auto releaseLock = kj::mv(outerLock);
// On the way out, we'll release the lock, thus allowing the threads to start, then we'll join
// each thread, thus waiting for them all to complete.
}
TEST(Encoding, Constants) {
EXPECT_EQ(VOID, test::TestConstants::VOID_CONST);
EXPECT_EQ(true, test::TestConstants::BOOL_CONST);
......
......@@ -190,7 +190,7 @@ public:
Orphanage getOrphanage();
private:
void* arenaSpace[15 + sizeof(kj::MutexGuarded<void*>) / sizeof(void*)];
void* arenaSpace[16];
// Space in which we can construct a BuilderArena. We don't use BuilderArena directly here
// because we don't want clients to have to #include arena.h, which itself includes a bunch of
// big STL headers. We don't use a pointer to a BuilderArena because that would require an
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment