Commit fc0c297c authored by Kenton Varda's avatar Kenton Varda

Make kj::Arena thread-safe.

parent a45c463c
......@@ -31,7 +31,7 @@ namespace {
class TestFailingErrorReporter: public ErrorReporter {
public:
void addError(uint32_t startByte, uint32_t endByte, kj::String message) override {
void addError(uint32_t startByte, uint32_t endByte, kj::StringPtr message) override {
ADD_FAILURE() << "Parse failed: (" << startByte << "-" << endByte << ") " << message.cStr();
}
};
......
......@@ -1167,7 +1167,7 @@ void SchemaLoader::InitializerImpl::init(const _::RawSchema* schema) const {
// Lock the loader for read to make sure no one is concurrently loading a replacement for this
// schema node.
auto lock = loader.impl.lockForRead();
auto lock = loader.impl.lockShared();
// Get the mutable version of the schema.
_::RawSchema* mutableSchema = lock->get()->tryGet(schema->id).schema;
......@@ -1195,12 +1195,12 @@ Schema SchemaLoader::get(uint64_t id) const {
}
kj::Maybe<Schema> SchemaLoader::tryGet(uint64_t id) const {
auto getResult = impl.lockForRead()->get()->tryGet(id);
auto getResult = impl.lockShared()->get()->tryGet(id);
if (getResult.schema == nullptr || getResult.schema->lazyInitializer != nullptr) {
KJ_IF_MAYBE(c, getResult.callback) {
c->load(*this, id);
}
getResult = impl.lockForRead()->get()->tryGet(id);
getResult = impl.lockShared()->get()->tryGet(id);
}
if (getResult.schema != nullptr && getResult.schema->lazyInitializer == nullptr) {
return Schema(getResult.schema);
......@@ -1210,11 +1210,11 @@ kj::Maybe<Schema> SchemaLoader::tryGet(uint64_t id) const {
}
Schema SchemaLoader::load(const schema::Node::Reader& reader) {
return Schema(impl.lock()->get()->load(reader, false));
return Schema(impl.lockExclusive()->get()->load(reader, false));
}
Schema SchemaLoader::loadOnce(const schema::Node::Reader& reader) const {
auto locked = impl.lock();
auto locked = impl.lockExclusive();
auto getResult = locked->get()->tryGet(reader.getId());
if (getResult.schema == nullptr || getResult.schema->lazyInitializer != nullptr) {
// Doesn't exist yet, or the existing schema is a placeholder and therefore has not yet been
......@@ -1226,11 +1226,11 @@ Schema SchemaLoader::loadOnce(const schema::Node::Reader& reader) const {
}
kj::Array<Schema> SchemaLoader::getAllLoaded() const {
return impl.lockForRead()->get()->getAllLoaded();
return impl.lockShared()->get()->getAllLoaded();
}
void SchemaLoader::loadNative(const _::RawSchema* nativeSchema) {
impl.lock()->get()->loadNative(nativeSchema);
impl.lockExclusive()->get()->loadNative(nativeSchema);
}
} // namespace capnp
......@@ -197,32 +197,50 @@ TEST(Arena, Alignment) {
TEST(Arena, EndOfChunk) {
union {
byte scratch[16];
byte scratch[64];
uint64_t align;
};
Arena arena(arrayPtr(scratch, sizeof(scratch)));
// First allocation will come from somewhere in the scratch space (after the chunk header).
uint64_t& i = arena.allocate<uint64_t>();
EXPECT_EQ(scratch, reinterpret_cast<byte*>(&i));
EXPECT_GE(reinterpret_cast<byte*>(&i), scratch);
EXPECT_LT(reinterpret_cast<byte*>(&i), scratch + sizeof(scratch));
// Next allocation will come at the next position.
uint64_t& i2 = arena.allocate<uint64_t>();
EXPECT_EQ(scratch + 8, reinterpret_cast<byte*>(&i2));
EXPECT_EQ(&i + 1, &i2);
// Allocate the rest of the scratch space.
size_t spaceLeft = scratch + sizeof(scratch) - reinterpret_cast<byte*>(&i2 + 1);
ArrayPtr<byte> remaining = arena.allocateArray<byte>(spaceLeft);
EXPECT_EQ(reinterpret_cast<byte*>(&i2 + 1), remaining.begin());
// Next allocation comes from somewhere new.
uint64_t& i3 = arena.allocate<uint64_t>();
EXPECT_NE(scratch + 16, reinterpret_cast<byte*>(&i3));
EXPECT_NE(remaining.end(), reinterpret_cast<byte*>(&i3));
}
TEST(Arena, EndOfChunkAlignment) {
union {
byte scratch[10];
byte scratch[34];
uint64_t align;
};
Arena arena(arrayPtr(scratch, sizeof(scratch)));
// Figure out where we are...
byte* start = arena.allocateArray<byte>(0).begin();
// Allocate enough space so that we're 24 bytes into the scratch space. (On 64-bit systems, this
// should be zero.)
arena.allocateArray<byte>(24 - (start - scratch));
// Allocating a 16-bit integer works. Now we're at 26 bytes; 8 bytes are left.
uint16_t& i = arena.allocate<uint16_t>();
EXPECT_EQ(scratch, reinterpret_cast<byte*>(&i));
EXPECT_EQ(scratch + 24, reinterpret_cast<byte*>(&i));
// Although there is technically enough space in the scratch array, it is not aligned correctly.
// Although there is technically enough space to allocate a uint64, it is not aligned correctly,
// so it will be allocated elsewhere instead.
uint64_t& i2 = arena.allocate<uint64_t>();
EXPECT_TRUE(reinterpret_cast<byte*>(&i2) < scratch ||
reinterpret_cast<byte*>(&i2) > scratch + sizeof(scratch));
......@@ -237,14 +255,19 @@ TEST(Arena, TooBig) {
byte& b2 = arena.allocate<byte>();
EXPECT_EQ(&b1 + 1, &b2);
// The array should not have been allocated anywhere near that first byte.
EXPECT_TRUE(arr.begin() < &b1 || arr.begin() > &b1 + 512);
// The next byte should have been allocated after the array.
EXPECT_EQ(arr.end(), &b2);
// Write to the array to make sure it's valid.
memset(arr.begin(), 0xbe, arr.size());
}
TEST(Arena, MultiSegment) {
Arena arena(24);
// Sorry, this test makes assumptions about the size of ChunkHeader.
Arena arena(sizeof(void*) == 4 ? 32 : 40);
uint64_t& i1 = arena.allocate<uint64_t>();
uint64_t& i2 = arena.allocate<uint64_t>();
......@@ -283,5 +306,96 @@ TEST(Arena, Strings) {
EXPECT_EQ(quux.end() + 1, corge.begin());
}
// I tried to use std::thread but it threw a pure-virtual exception. It's unclear if it's meant
// to be ready in GCC 4.7.
class Thread {
public:
template <typename Func>
explicit Thread(Func&& func) {
KJ_ASSERT(pthread_create(
&thread, nullptr, &runThread<Decay<Func>>,
new Decay<Func>(kj::fwd<Func>(func))) == 0);
}
~Thread() {
KJ_ASSERT(pthread_join(thread, nullptr) == 0);
}
private:
pthread_t thread;
template <typename Func>
static void* runThread(void* ptr) {
Func* func = reinterpret_cast<Func*>(ptr);
KJ_DEFER(delete func);
(*func)();
return nullptr;
}
};
struct ThreadTestObject {
ThreadTestObject* next;
void* owner; // points into the owning thread's stack
ThreadTestObject(ThreadTestObject* next, void* owner)
: next(next), owner(owner) {}
~ThreadTestObject() { ++destructorCount; }
static uint destructorCount;
};
uint ThreadTestObject::destructorCount = 0;
TEST(Arena, Threads) {
// Test thread-safety. We allocate objects in four threads simultaneously, verify that they
// are not corrupted, then verify that their destructors are all called when the Arena is
// destroyed.
{
MutexGuarded<Arena> arena;
// Func to run in each thread.
auto threadFunc = [&]() {
int me;
ThreadTestObject* head = nullptr;
{
auto lock = arena.lockShared();
// Allocate a huge linked list.
for (uint i = 0; i < 100000; i++) {
head = &lock->allocate<ThreadTestObject>(head, &me);
}
}
// Wait until all other threads are done before verifying.
arena.lockExclusive();
// Verify that the list hasn't been corrupted.
while (head != nullptr) {
ASSERT_EQ(&me, head->owner);
head = head->next;
}
};
{
auto lock = arena.lockExclusive();
Thread thread1(threadFunc);
Thread thread2(threadFunc);
Thread thread3(threadFunc);
Thread thread4(threadFunc);
// Wait for threads to be ready.
usleep(10000);
auto release = kj::mv(lock);
// As we go out of scope, the lock will be released (since `release` is destroyed first),
// allowing all the threads to start running. We'll then join each thread.
}
EXPECT_EQ(0, ThreadTestObject::destructorCount);
}
EXPECT_EQ(400000, ThreadTestObject::destructorCount);
}
} // namespace
} // namespace kj
......@@ -27,11 +27,20 @@
namespace kj {
Arena::Arena(size_t chunkSize): state(chunkSize) {}
Arena::Arena(ArrayPtr<byte> scratch, size_t chunkSize): state(chunkSize) {
state.pos = scratch.begin();
state.chunkEnd = scratch.end();
Arena::Arena(size_t chunkSizeHint): state(kj::max(sizeof(ChunkHeader), chunkSizeHint)) {}
Arena::Arena(ArrayPtr<byte> scratch)
: state(kj::max(sizeof(ChunkHeader), scratch.size())) {
if (scratch.size() > sizeof(ChunkHeader)) {
ChunkHeader* chunk = reinterpret_cast<ChunkHeader*>(scratch.begin());
chunk->end = scratch.end();
chunk->pos = reinterpret_cast<byte*>(chunk + 1);
chunk->next = nullptr; // Never actually observed.
// Don't place the chunk in the chunk list because it's not ours to delete. Just make it the
// current chunk so that we'll allocate from it until it is empty.
state.getWithoutLock().currentChunk = chunk;
}
}
Arena::~Arena() noexcept(false) {
......@@ -39,7 +48,7 @@ Arena::~Arena() noexcept(false) {
// called. This ensures that if the first pass throws an exception, remaining objects are still
// destroyed. If the second pass throws, the program terminates, but any destructors that could
// throw should be using UnwindDetector to avoid this.
state.cleanup();
state.getWithoutLock().cleanup();
}
void Arena::State::cleanup() {
......@@ -72,67 +81,108 @@ inline byte* alignTo(byte* p, uint alignment) {
return reinterpret_cast<byte*>((i + mask) & ~mask);
}
} // namespace
inline size_t alignTo(size_t s, uint alignment) {
// Round the pointer up to the next aligned value.
void* Arena::allocateBytes(size_t amount, uint alignment, bool hasDisposer) {
// Code below depends on power-of-two alignment and header sizes.
static_assert(isPowerOfTwo(sizeof(ChunkHeader)), "sizeof(ChunkHeader) is not a power of 2.");
static_assert(isPowerOfTwo(sizeof(ObjectHeader)), "sizeof(ObjectHeader) is not a power of 2.");
KJ_DASSERT(isPowerOfTwo(alignment), alignment);
size_t mask = alignment - 1;
return (s + mask) & ~mask;
}
// Offset we must apply if the allocated space is being prefixed with these headers.
uint chunkHeaderSize = kj::max(alignment, sizeof(ChunkHeader));
uint objectHeaderSize = kj::max(alignment, sizeof(ObjectHeader));
} // namespace
void* Arena::allocateBytes(size_t amount, uint alignment, bool hasDisposer) const {
if (hasDisposer) {
amount += objectHeaderSize;
alignment = kj::max(alignment, alignof(ObjectHeader));
amount += alignTo(sizeof(ObjectHeader), alignment);
}
void* result;
byte* alignedPos = alignTo(state.pos, alignment);
byte* endPos = alignedPos + amount;
if (endPos <= state.chunkEnd) {
// There's enough space in the current chunk.
result = alignedPos;
state.pos = alignedPos + amount;
} else if (amount + chunkHeaderSize > state.chunkSize ||
state.chunkEnd - state.pos > state.chunkSize / 4) {
// This object is too big to fit in one chunk, or we'd waste more than a quarter of the chunk
// by starting a new one now. Instead, allocate the object in its own independent chunk.
ChunkHeader* newChunk = reinterpret_cast<ChunkHeader*>(operator new(chunkHeaderSize + amount));
result = reinterpret_cast<byte*>(newChunk) + chunkHeaderSize;
newChunk->next = state.chunkList;
state.chunkList = newChunk;
// We don't update state.pos and state.chunkEnd because the new chunk has no extra space but
// the old chunk might.
} else {
// Allocate a new chunk.
ChunkHeader* newChunk = reinterpret_cast<ChunkHeader*>(operator new(state.chunkSize));
result = reinterpret_cast<byte*>(newChunk) + chunkHeaderSize;
newChunk->next = state.chunkList;
state.chunkList = newChunk;
state.pos = reinterpret_cast<byte*>(result) + amount;
state.chunkEnd = reinterpret_cast<byte*>(newChunk) + state.chunkSize;
for (;;) {
ChunkHeader* chunk = __atomic_load_n(&state.getWithoutLock().currentChunk, __ATOMIC_ACQUIRE);
if (chunk == nullptr) {
// No chunks allocated yet.
result = allocateBytesFallback(amount, alignment);
break;
}
byte* pos = __atomic_load_n(&chunk->pos, __ATOMIC_RELAXED);
byte* alignedPos = alignTo(pos, alignment);
byte* endPos = alignedPos + amount;
// Careful about pointer wrapping (e.g. if the chunk is near the end of the address space).
if (chunk->end - endPos < 0) {
// Not enough space.
result = allocateBytesFallback(amount, alignment);
break;
}
// There appears to be enough space in this chunk, unless another thread stole it.
if (KJ_LIKELY(__atomic_compare_exchange_n(
&chunk->pos, &pos, endPos, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))) {
result = alignedPos;
break;
}
}
if (hasDisposer) {
// Reserve space for the ObjectHeader, but don't add it to the object list yet.
result = reinterpret_cast<byte*>(result) + objectHeaderSize;
result = alignTo(reinterpret_cast<byte*>(result) + sizeof(ObjectHeader), alignment);
}
KJ_DASSERT(reinterpret_cast<uintptr_t>(result) % alignment == 0);
return result;
}
StringPtr Arena::copyString(StringPtr content) {
void* Arena::allocateBytesFallback(size_t amount, uint alignment) const {
auto lock = state.lockExclusive();
// We already know that the current chunk is out of space.
alignment = kj::max(alignment, alignof(ChunkHeader));
amount += alignTo(sizeof(ChunkHeader), alignment);
while (lock->nextChunkSize < amount) {
lock->nextChunkSize *= 2;
}
byte* bytes = reinterpret_cast<byte*>(operator new(lock->nextChunkSize));
ChunkHeader* newChunk = reinterpret_cast<ChunkHeader*>(bytes);
newChunk->next = lock->chunkList;
newChunk->pos = bytes + amount;
newChunk->end = bytes + lock->nextChunkSize;
__atomic_store_n(&lock->currentChunk, newChunk, __ATOMIC_RELEASE);
lock->nextChunkSize *= 2;
byte* result = alignTo(bytes + sizeof(ChunkHeader), alignment);
lock->chunkList = newChunk;
return result;
}
StringPtr Arena::copyString(StringPtr content) const {
char* data = reinterpret_cast<char*>(allocateBytes(content.size() + 1, 1, false));
memcpy(data, content.cStr(), content.size() + 1);
return StringPtr(data, content.size());
}
void Arena::setDestructor(void* ptr, void (*destructor)(void*)) {
void Arena::setDestructor(void* ptr, void (*destructor)(void*)) const {
ObjectHeader* header = reinterpret_cast<ObjectHeader*>(ptr) - 1;
KJ_DASSERT(reinterpret_cast<uintptr_t>(header) % alignof(ObjectHeader) == 0);
header->destructor = destructor;
header->next = state.objectList;
state.objectList = header;
header->next = state.getWithoutLock().objectList;
// We can use relaxed atomics here because the object list is not actually traversed until the
// destructor, which needs to be synchronized in its own way.
while (!__atomic_compare_exchange_n(
&state.getWithoutLock().objectList, &header->next, header, true,
__ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
// Retry.
}
}
} // namespace kj
......@@ -27,40 +27,45 @@
#include "memory.h"
#include "array.h"
#include "string.h"
#include "mutex.h"
namespace kj {
class Arena {
// A class which allows several objects to be allocated in contiguous chunks of memory, then
// frees them all at once.
//
// Allocating from the same Arena in multiple threads concurrently is safe but not particularly
// performant due to contention. The class could be optimized in the future to use per-thread
// chunks to solve this.
public:
Arena(size_t chunkSize = 1024);
// Create an Arena that tries to allocate in chunks of the given size. It may deviate from the
// size if an object is too large to fit or to avoid excessive fragmentation. Each chunk has
// a one-word header which is included in the chunk size.
explicit Arena(size_t chunkSizeHint = 1024);
// Create an Arena. `chunkSizeHint` hints at where to start when allocating chunks, but is only
// a hint -- the Arena will, for example, allocate progressively larger chunks as time goes on,
// in order to reduce overall allocation overhead.
Arena(ArrayPtr<byte> scratch, size_t chunkSize = 1024);
explicit Arena(ArrayPtr<byte> scratch);
// Allocates from the given scratch space first, only resorting to the heap when it runs out.
KJ_DISALLOW_COPY(Arena);
~Arena() noexcept(false);
template <typename T, typename... Params>
T& allocate(Params&&... params);
T& allocate(Params&&... params) const;
template <typename T>
ArrayPtr<T> allocateArray(size_t size);
ArrayPtr<T> allocateArray(size_t size) const;
// Allocate an object or array of type T. If T has a non-trivial destructor, that destructor
// will be run during the Arena's destructor. Such destructors are run in opposite order of
// allocation. Note that these methods must maintain a list of destructors to call, which has
// overhead, but this overhead only applies if T has a non-trivial destructor.
template <typename T, typename... Params>
Own<T> allocateOwn(Params&&... params);
Own<T> allocateOwn(Params&&... params) const;
template <typename T>
Array<T> allocateOwnArray(size_t size);
Array<T> allocateOwnArray(size_t size) const;
template <typename T>
ArrayBuilder<T> allocateOwnArrayBuilder(size_t capacity);
ArrayBuilder<T> allocateOwnArrayBuilder(size_t capacity) const;
// Allocate an object or array of type T. Destructors are executed when the returned Own<T>
// or Array<T> goes out-of-scope, which must happen before the Arena is destroyed. This variant
// is useful when you need to control when the destructor is called. This variant also avoids
......@@ -68,16 +73,18 @@ public:
// slightly more efficient.
template <typename T>
inline T& copy(T&& value) { return allocate<Decay<T>>(kj::fwd<T>(value)); }
inline T& copy(T&& value) const { return allocate<Decay<T>>(kj::fwd<T>(value)); }
// Allocate a copy of the given value in the arena. This is just a shortcut for calling the
// type's copy (or move) constructor.
StringPtr copyString(StringPtr content);
StringPtr copyString(StringPtr content) const;
// Make a copy of the given string inside the arena, and return a pointer to the copy.
private:
struct ChunkHeader {
ChunkHeader* next;
byte* pos; // first unallocated byte in this chunk
byte* end; // end of this chunk
};
struct ObjectHeader {
void (*destructor)(void*);
......@@ -85,15 +92,15 @@ private:
};
struct State {
size_t chunkSize;
size_t nextChunkSize;
ChunkHeader* chunkList;
ObjectHeader* objectList;
byte* pos;
byte* chunkEnd;
mutable ObjectHeader* objectList;
inline State(size_t chunkSize)
: chunkSize(chunkSize), chunkList(nullptr), objectList(nullptr),
pos(nullptr), chunkEnd(nullptr) {}
ChunkHeader* currentChunk;
inline State(size_t nextChunkSize)
: nextChunkSize(nextChunkSize), chunkList(nullptr),
objectList(nullptr), currentChunk(nullptr) {}
inline ~State() noexcept(false) { cleanup(); }
void cleanup();
......@@ -101,13 +108,16 @@ private:
// left in a consistent state, such that if cleanup() is called again, it will pick up where
// it left off.
};
State state;
MutexGuarded<State> state;
void* allocateBytes(size_t amount, uint alignment, bool hasDisposer);
void* allocateBytes(size_t amount, uint alignment, bool hasDisposer) const;
// Allocate the given number of bytes. `hasDisposer` must be true if `setDisposer()` may be
// called on this pointer later.
void setDestructor(void* ptr, void (*destructor)(void*));
void* allocateBytesFallback(size_t amount, uint alignment) const;
// Fallback used when the current chunk is out of space.
void setDestructor(void* ptr, void (*destructor)(void*)) const;
// Schedule the given destructor to be executed when the Arena is destroyed. `ptr` must be a
// pointer previously returned by an `allocateBytes()` call for which `hasDisposer` was true.
......@@ -130,7 +140,7 @@ private:
// Inline implementation details
template <typename T, typename... Params>
T& Arena::allocate(Params&&... params) {
T& Arena::allocate(Params&&... params) const {
T& result = *reinterpret_cast<T*>(allocateBytes(
sizeof(T), alignof(T), !__has_trivial_destructor(T)));
if (!__has_trivial_constructor(T) || sizeof...(Params) > 0) {
......@@ -143,7 +153,7 @@ T& Arena::allocate(Params&&... params) {
}
template <typename T>
ArrayPtr<T> Arena::allocateArray(size_t size) {
ArrayPtr<T> Arena::allocateArray(size_t size) const {
if (__has_trivial_destructor(T)) {
ArrayPtr<T> result =
arrayPtr(reinterpret_cast<T*>(allocateBytes(
......@@ -179,7 +189,7 @@ ArrayPtr<T> Arena::allocateArray(size_t size) {
}
template <typename T, typename... Params>
Own<T> Arena::allocateOwn(Params&&... params) {
Own<T> Arena::allocateOwn(Params&&... params) const {
T& result = *reinterpret_cast<T*>(allocateBytes(sizeof(T), alignof(T), false));
if (!__has_trivial_constructor(T) || sizeof...(Params) > 0) {
ctor(result, kj::fwd<Params>(params)...);
......@@ -188,7 +198,7 @@ Own<T> Arena::allocateOwn(Params&&... params) {
}
template <typename T>
Array<T> Arena::allocateOwnArray(size_t size) {
Array<T> Arena::allocateOwnArray(size_t size) const {
ArrayBuilder<T> result = allocateOwnArrayBuilder<T>(size);
for (size_t i = 0; i < size; i++) {
result.add();
......@@ -197,7 +207,7 @@ Array<T> Arena::allocateOwnArray(size_t size) {
}
template <typename T>
ArrayBuilder<T> Arena::allocateOwnArrayBuilder(size_t capacity) {
ArrayBuilder<T> Arena::allocateOwnArrayBuilder(size_t capacity) const {
return ArrayBuilder<T>(
reinterpret_cast<T*>(allocateBytes(sizeof(T) * capacity, alignof(T), false)),
capacity, DestructorOnlyArrayDisposer::instance);
......
......@@ -63,11 +63,11 @@ TEST(Mutex, MutexGuarded) {
MutexGuarded<uint> value(123);
{
Locked<uint> lock = value.lock();
Locked<uint> lock = value.lockExclusive();
EXPECT_EQ(123, *lock);
Thread thread([&]() {
Locked<uint> threadLock = value.lock();
Locked<uint> threadLock = value.lockExclusive();
EXPECT_EQ(456, *threadLock);
*threadLock = 789;
});
......@@ -78,13 +78,13 @@ TEST(Mutex, MutexGuarded) {
auto earlyRelease = kj::mv(lock);
}
EXPECT_EQ(789, *value.lock());
EXPECT_EQ(789, *value.lockExclusive());
{
auto rlock1 = value.lockForRead();
auto rlock1 = value.lockShared();
Thread thread2([&]() {
Locked<uint> threadLock = value.lock();
Locked<uint> threadLock = value.lockExclusive();
*threadLock = 321;
});
......@@ -92,11 +92,11 @@ TEST(Mutex, MutexGuarded) {
EXPECT_EQ(789, *rlock1);
{
auto rlock2 = value.lockForRead();
auto rlock2 = value.lockShared();
EXPECT_EQ(789, *rlock2);
auto rlock3 = value.lockForRead();
auto rlock3 = value.lockShared();
EXPECT_EQ(789, *rlock3);
auto rlock4 = value.lockForRead();
auto rlock4 = value.lockShared();
EXPECT_EQ(789, *rlock4);
}
......@@ -105,7 +105,7 @@ TEST(Mutex, MutexGuarded) {
auto earlyRelease = kj::mv(rlock1);
}
EXPECT_EQ(321, *value.lock());
EXPECT_EQ(321, *value.lockExclusive());
}
TEST(Mutex, Lazy) {
......
......@@ -50,15 +50,18 @@ Mutex::~Mutex() {
KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
}
void Mutex::lock() {
KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
}
void Mutex::readLock() {
KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
void Mutex::lock(Exclusivity exclusivity) {
switch (exclusivity) {
case EXCLUSIVE:
KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
break;
case SHARED:
KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
break;
}
}
void Mutex::unlock(bool lockedForRead) {
void Mutex::unlock(Exclusivity exclusivity) {
KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex));
}
......
......@@ -47,9 +47,13 @@ public:
~Mutex();
KJ_DISALLOW_COPY(Mutex);
void lock();
void readLock();
void unlock(bool lockedForRead);
enum Exclusivity {
EXCLUSIVE,
SHARED
};
void lock(Exclusivity exclusivity);
void unlock(Exclusivity exclusivity);
private:
mutable pthread_rwlock_t mutex;
......@@ -97,7 +101,9 @@ public:
other.mutex = nullptr;
other.ptr = nullptr;
}
inline ~Locked() { if (mutex != nullptr) mutex->unlock(isConst<T>()); }
inline ~Locked() {
if (mutex != nullptr) mutex->unlock(isConst<T>() ? _::Mutex::SHARED : _::Mutex::EXCLUSIVE);
}
inline Locked& operator=(Locked&& other) {
if (mutex != nullptr) mutex->unlock(isConst<T>());
......@@ -139,8 +145,8 @@ public:
explicit MutexGuarded(Params&&... params);
// Initialize the mutex-guarded object by passing the given parameters to its constructor.
Locked<T> lock() const;
// Locks the mutex and returns the guarded object. The returned `Locked<T>` can be passed by
Locked<T> lockExclusive() const;
// Exclusively locks the object and returns it. The returned `Locked<T>` can be passed by
// move, similar to `Own<T>`.
//
// This method is declared `const` in accordance with KJ style rules which say that constness
......@@ -149,9 +155,9 @@ public:
// be shared between threads, its methods should be const, even though locking it produces a
// non-const pointer to the contained object.
Locked<const T> lockForRead() const;
// Lock the value for read-only access. Multiple read-only locks can be taken concurrently, as
// long as there are no writers.
Locked<const T> lockShared() const;
// Lock the value for shared access. Multiple shared locks can be taken concurrently, but cannot
// be held at the same time as a non-shared lock.
inline const T& getWithoutLock() const { return value; }
inline T& getWithoutLock() { return value; }
......@@ -205,14 +211,14 @@ inline MutexGuarded<T>::MutexGuarded(Params&&... params)
: value(kj::fwd<Params>(params)...) {}
template <typename T>
inline Locked<T> MutexGuarded<T>::lock() const {
mutex.lock();
inline Locked<T> MutexGuarded<T>::lockExclusive() const {
mutex.lock(_::Mutex::EXCLUSIVE);
return Locked<T>(mutex, value);
}
template <typename T>
inline Locked<const T> MutexGuarded<T>::lockForRead() const {
mutex.readLock();
inline Locked<const T> MutexGuarded<T>::lockShared() const {
mutex.lock(_::Mutex::SHARED);
return Locked<const T>(mutex, value);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment