Commit d3f4e7ba authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Implement MutexGuarded::when() to wait for a condition.

Currently only implemented for Linux (when using futexes), since that's what I use. Will expand to other platforms later.
parent a74e502b
......@@ -23,6 +23,7 @@
#include "debug.h"
#include "thread.h"
#include <kj/compat/gtest.h>
#include <stdlib.h>
#if _WIN32
#define NOGDI // NOGDI is needed to make EXPECT_EQ(123u, *lock) compile for some reason
......@@ -118,6 +119,59 @@ TEST(Mutex, MutexGuarded) {
EXPECT_EQ(321u, value.getWithoutLock());
}
#if KJ_USE_FUTEX // TODO(soon): Implement on pthread & win32
TEST(Mutex, When) {
MutexGuarded<uint> value(123);
{
uint m = value.when([](uint n) { return n < 200; }, [](uint& n) {
++n;
return n + 2;
});
KJ_EXPECT(m == 126);
KJ_EXPECT(*value.lockShared() == 124);
}
{
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
uint m = value.when([](uint n) { return n > 200; }, [](uint& n) {
++n;
return n + 2;
});
KJ_EXPECT(m == 324);
KJ_EXPECT(*value.lockShared() == 322);
}
{
// Stress test. 100 threads each wait for a value and then set the next value.
*value.lockExclusive() = 0;
auto threads = kj::heapArrayBuilder<kj::Own<kj::Thread>>(100);
for (auto i: kj::zeroTo(100)) {
threads.add(kj::heap<kj::Thread>([i,&value]() {
if (i % 2 == 0) delay();
uint m = value.when([i](const uint& n) { return n == i; },
[i](uint& n) { return n++; });
KJ_ASSERT(m == i);
}));
}
uint m = value.when([](uint n) { return n == 100; }, [](uint& n) {
return n++;
});
KJ_EXPECT(m == 100);
KJ_EXPECT(*value.lockShared() == 101);
}
}
#endif
TEST(Mutex, Lazy) {
Lazy<uint> lazy;
volatile bool initStarted = false;
......
......@@ -105,10 +105,40 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
struct Mutex::Waiter {
kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev;
Predicate& predicate;
uint futex;
};
void Mutex::unlock(Exclusivity exclusivity) {
switch (exclusivity) {
case EXCLUSIVE: {
KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
// First check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (waiter->predicate.check()) {
// This waiter's predicate now evaluates true, so wake it up.
__atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
// We transferred ownership of the lock to this waiter, so we're done now.
return;
}
} else {
// No more waiters.
break;
}
}
// Didn't wake any waiters, so wake normally.
uint oldState = __atomic_fetch_and(
&futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE);
......@@ -154,6 +184,40 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate) {
lock(EXCLUSIVE);
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, 0 };
*waitersTail = waiter;
waitersTail = &waiter.next;
KJ_DEFER({
// Remove from list.
*waiter.prev = waiter.next;
KJ_IF_MAYBE(next, waiter.next) {
next->prev = waiter.prev;
} else {
KJ_DASSERT(waitersTail == &waiter.next);
waitersTail = waiter.prev;
}
});
if (!predicate.check()) {
unlock(EXCLUSIVE);
// Wait for someone to set out futex to 1.
while (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) == 0) {
syscall(SYS_futex, &waiter.futex, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
}
// Ownership of an exclusive lock was transferred to us. We can continue.
#ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE);
#endif
}
}
void Once::runOnce(Initializer& init) {
startOver:
uint state = UNINITIALIZED;
......
......@@ -67,6 +67,16 @@ public:
// non-trivial, assert that the mutex is locked (which should be good enough to catch problems
// in unit tests). In non-debug builds, do nothing.
#if KJ_USE_FUTEX // TODO(soon): Implement on pthread & win32
class Predicate {
public:
virtual bool check() = 0;
};
void lockWhen(Predicate& predicate);
// Lock (exclusively) when predicate.check() returns true.
#endif
private:
#if KJ_USE_FUTEX
uint futex;
......@@ -80,6 +90,11 @@ private:
static constexpr uint EXCLUSIVE_REQUESTED = 1u << 30;
static constexpr uint SHARED_COUNT_MASK = EXCLUSIVE_REQUESTED - 1;
struct Waiter;
kj::Maybe<Waiter&> waitersHead = nullptr;
kj::Maybe<Waiter&>* waitersTail = &waitersHead;
// linked list of waitUntil()s; can only modify under lock
#elif _WIN32
uintptr_t srwLock; // Actually an SRWLOCK, but don't want to #include <windows.h> in header.
......@@ -249,6 +264,39 @@ public:
inline T& getAlreadyLockedExclusive() const;
// Like `getWithoutLock()`, but asserts that the lock is already held by the calling thread.
#if KJ_USE_FUTEX // TODO(soon): Implement on pthread & win32
template <typename Cond, typename Func>
auto when(Cond&& condition, Func&& callback) const -> decltype(callback(instance<T&>())) {
// Waits until condition(state) returns true, then calls callback(state) under lock.
//
// `condition`, when called, receives as its parameter a const reference to the state, which is
// locked (either shared or exclusive). `callback` returns a mutable reference, which is
// exclusively locked.
//
// `condition()` may be called multiple times, from multiple threads, while waiting for the
// condition to become true. It may even return true once, but then be called more times.
// It is guaranteed, though, that at the time `callback()` is finally called, `condition()`
// would currently return true (assuming it is a pure function of the guarded data).
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
return condition(value);
}
Cond&& condition;
const T& value;
PredicateImpl(Cond&& condition, const T& value)
: condition(kj::fwd<Cond>(condition)), value(value) {}
};
PredicateImpl impl(kj::fwd<Cond>(condition), value);
mutex.lockWhen(impl);
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
return callback(value);
}
#endif
private:
mutable _::Mutex mutex;
mutable T value;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment