Commit 4a768c2f authored by Kenton Varda's avatar Kenton Varda

Implement MutexGuarded::when() for pthreads.

I originally left this unimplemented because pthreads annoyingly doesn't support condvar on top of rwlocks. It turns out there's a trick that can be used involving an extra mutex and some redundant locking operations -- the same trick that powers std::condition_variable_any. I used that here.

Win32 support will come in a subsequent commit, before merging to master.
parent 99d308ff
......@@ -119,7 +119,6 @@ TEST(Mutex, MutexGuarded) {
EXPECT_EQ(321u, value.getWithoutLock());
}
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
TEST(Mutex, When) {
MutexGuarded<uint> value(123);
......@@ -170,7 +169,6 @@ TEST(Mutex, When) {
KJ_EXPECT(*value.lockShared() == 101);
}
}
#endif
TEST(Mutex, Lazy) {
Lazy<uint> lazy;
......
......@@ -52,6 +52,20 @@
namespace kj {
namespace _ { // private
inline void Mutex::addWaiter(Waiter& waiter) {
*waitersTail = waiter;
waitersTail = &waiter.next;
}
inline void Mutex::removeWaiter(Waiter& waiter) {
*waiter.prev = waiter.next;
KJ_IF_MAYBE(next, waiter.next) {
next->prev = waiter.prev;
} else {
KJ_DASSERT(waitersTail == &waiter.next);
waitersTail = waiter.prev;
}
}
#if KJ_USE_FUTEX
// =======================================================================================
// Futex-based implementation (Linux-only)
......@@ -105,13 +119,6 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
struct Mutex::Waiter {
kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev;
Predicate& predicate;
uint futex;
};
void Mutex::unlock(Exclusivity exclusivity) {
switch (exclusivity) {
case EXCLUSIVE: {
......@@ -189,19 +196,9 @@ void Mutex::lockWhen(Predicate& predicate) {
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, 0 };
*waitersTail = waiter;
waitersTail = &waiter.next;
KJ_DEFER({
// Remove from list.
*waiter.prev = waiter.next;
KJ_IF_MAYBE(next, waiter.next) {
next->prev = waiter.prev;
} else {
KJ_DASSERT(waitersTail == &waiter.next);
waitersTail = waiter.prev;
}
});
addWaiter(waiter);
KJ_DEFER(removeWaiter(waiter));
if (!predicate.check()) {
unlock(EXCLUSIVE);
......@@ -396,7 +393,37 @@ void Mutex::lock(Exclusivity exclusivity) {
}
void Mutex::unlock(Exclusivity exclusivity) {
KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex));
KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
if (exclusivity == EXCLUSIVE) {
// Check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (waiter->predicate.check()) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use _signal() vs. _broadcast() here since there's always only one thread waiting.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
// We only need to wake one waiter. Note that unlike the futex-based implementation, we
// cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
// that the condition is still true when that waiter finally awakes. However, if the
// condition is no longer true at that point, the waiter will re-check all other waiters'
// conditions and possibly wake up any other waiter who is now ready, hence we still only
// need to wake one waiter here.
break;
}
} else {
// No more waiters.
break;
}
}
}
}
void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
......@@ -419,6 +446,48 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate) {
lock(EXCLUSIVE);
// Add waiter to list.
Waiter waiter {
nullptr, waitersTail, predicate,
PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
};
addWaiter(waiter);
KJ_DEFER({
removeWaiter(waiter);
// Destroy pthread objects.
KJ_PTHREAD_CALL(pthread_mutex_destroy(&waiter.stupidMutex));
KJ_PTHREAD_CALL(pthread_cond_destroy(&waiter.condvar));
});
while (!predicate.check()) {
// pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic
// mutex before we unlock the real mutex, and the signaling thread also needs to lock this
// mutex, in order to ensure that this thread is actually waiting on the condvar before it is
// signaled.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
// OK, now we can unlock the main mutex.
unlock(EXCLUSIVE);
// Wait for someone to signal the condvar.
KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
// We have to be very careful about lock ordering here. We need to unlock stupidMutex before
// re-locking the main mutex, because another thread may have a lock on the main mutex already
// and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar
// right after we unlock stupidMutex but before we re-lock the main mutex. That is fine,
// because we've already been signaled.
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
lock(EXCLUSIVE);
}
}
Once::Once(bool startInitialized): state(startInitialized ? INITIALIZED : UNINITIALIZED) {
KJ_PTHREAD_CALL(pthread_mutex_init(&mutex, nullptr));
}
......
......@@ -66,7 +66,6 @@ public:
// non-trivial, assert that the mutex is locked (which should be good enough to catch problems
// in unit tests). In non-debug builds, do nothing.
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
class Predicate {
public:
virtual bool check() = 0;
......@@ -74,7 +73,6 @@ public:
void lockWhen(Predicate& predicate);
// Lock (exclusively) when predicate.check() returns true.
#endif
private:
#if KJ_USE_FUTEX
......@@ -89,17 +87,36 @@ private:
static constexpr uint EXCLUSIVE_REQUESTED = 1u << 30;
static constexpr uint SHARED_COUNT_MASK = EXCLUSIVE_REQUESTED - 1;
struct Waiter;
kj::Maybe<Waiter&> waitersHead = nullptr;
kj::Maybe<Waiter&>* waitersTail = &waitersHead;
// linked list of waitUntil()s; can only modify under lock
#elif _WIN32
uintptr_t srwLock; // Actually an SRWLOCK, but don't want to #include <windows.h> in header.
#else
mutable pthread_rwlock_t mutex;
#endif
struct Waiter {
kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev;
Predicate& predicate;
#if KJ_USE_FUTEX
uint futex;
#elif _WIN32
#error "TODO(now): implement Win32 support before merging"
#else
pthread_cond_t condvar;
pthread_mutex_t stupidMutex;
// pthread condvars are only compatible with basic pthread mutexes, not rwlocks, for no
// particularly good reason. To work around this, we need an extra mutex per condvar.
#endif
};
kj::Maybe<Waiter&> waitersHead = nullptr;
kj::Maybe<Waiter&>* waitersTail = &waitersHead;
// linked list of waitUntil()s; can only modify under lock
inline void addWaiter(Waiter& waiter);
inline void removeWaiter(Waiter& waiter);
};
class Once {
......@@ -265,13 +282,12 @@ public:
inline T& getAlreadyLockedExclusive() const;
// Like `getWithoutLock()`, but asserts that the lock is already held by the calling thread.
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
template <typename Cond, typename Func>
auto when(Cond&& condition, Func&& callback) const -> decltype(callback(instance<T&>())) {
// Waits until condition(state) returns true, then calls callback(state) under lock.
//
// `condition`, when called, receives as its parameter a const reference to the state, which is
// locked (either shared or exclusive). `callback` returns a mutable reference, which is
// locked (either shared or exclusive). `callback` receives a mutable reference, which is
// exclusively locked.
//
// `condition()` may be called multiple times, from multiple threads, while waiting for the
......@@ -296,7 +312,6 @@ public:
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
return callback(value);
}
#endif
private:
mutable _::Mutex mutex;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment