Commit 2625589f authored by Kenton Varda's avatar Kenton Varda

Add `Locked<T>::wait()`, an alternate interface to condvars.

This is better than `MutexGuarded<T>::when()` in cases where the caller already has a lock -- `when()` would require unlocking, then immediately locking again (to check the predicate).

Possibly `when()` should be removed in favor of `wait()`.
parent 6daea3d2
...@@ -265,24 +265,17 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) { ...@@ -265,24 +265,17 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
} }
} }
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list. // Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr }; Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
addWaiter(waiter); addWaiter(waiter);
// To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
// currently.
bool currentlyLocked = true;
KJ_DEFER({ KJ_DEFER({
if (!currentlyLocked) lock(EXCLUSIVE); if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter); removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
}); });
if (!predicate.check()) { if (!predicate.check()) {
...@@ -500,12 +493,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) { ...@@ -500,12 +493,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
// held for debug purposes anyway, we just don't bother. // held for debug purposes anyway, we just don't bother.
} }
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Any exceptions should leave the mutex unlocked.
KJ_ON_SCOPE_FAILURE(unlock(EXCLUSIVE));
// Add waiter to list. // Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 }; Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE), static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
...@@ -723,27 +711,20 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) { ...@@ -723,27 +711,20 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
} }
} }
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list. // Add waiter to list.
Waiter waiter { Waiter waiter {
nullptr, waitersTail, predicate, nullptr, nullptr, waitersTail, predicate, nullptr,
PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
}; };
addWaiter(waiter); addWaiter(waiter);
// To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
// currently.
bool currentlyLocked = true;
KJ_DEFER({ KJ_DEFER({
if (!currentlyLocked) lock(EXCLUSIVE); if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter); removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
// Destroy pthread objects. // Destroy pthread objects.
KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex)); KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
......
...@@ -74,14 +74,16 @@ public: ...@@ -74,14 +74,16 @@ public:
virtual bool check() = 0; virtual bool check() = 0;
}; };
void lockWhen(Predicate& predicate, Maybe<Duration> timeout = nullptr); void wait(Predicate& predicate, Maybe<Duration> timeout = nullptr);
// Lock (exclusively) when predicate.check() returns true, or when the timeout (if any) expires. // If predicate.check() returns false, unlock the mutex until predicate.check() returns true, or
// The mutex is always locked when this returns regardless of whether the timeout expired (and // when the timeout (if any) expires. The mutex is always re-locked when this returns regardless
// always unlocked if it throws). // of whether the timeout expired, and including if it throws.
//
// Requires that the mutex is already exclusively locked before calling.
void induceSpuriousWakeupForTest(); void induceSpuriousWakeupForTest();
// Utility method for mutex-test.c++ which causes a spurious thread wakeup on all threads that // Utility method for mutex-test.c++ which causes a spurious thread wakeup on all threads that
// are waiting for a lockWhen() condition. Assuming correct implementation, all those threads // are waiting for a wait() condition. Assuming correct implementation, all those threads
// should immediately go back to sleep. // should immediately go back to sleep.
private: private:
...@@ -241,6 +243,31 @@ public: ...@@ -241,6 +243,31 @@ public:
inline operator T*() { return ptr; } inline operator T*() { return ptr; }
inline operator const T*() const { return ptr; } inline operator const T*() const { return ptr; }
template <typename Cond>
void wait(Cond&& condition, Maybe<Duration> timeout = nullptr) {
// Unlocks the lock until `condition(state)` evaluates true (where `state` is type `const T&`
// referencing the object protected by the lock).
// We can't wait on a shared lock because the internal bookkeeping needed for a wait requires
// the protection of an exclusive lock.
static_assert(!isConst<T>(), "cannot wait() on shared lock");
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
return condition(value);
}
Cond&& condition;
const T& value;
PredicateImpl(Cond&& condition, const T& value)
: condition(kj::fwd<Cond>(condition)), value(value) {}
};
PredicateImpl impl(kj::fwd<Cond>(condition), *ptr);
mutex->wait(impl, timeout);
}
private: private:
_::Mutex* mutex; _::Mutex* mutex;
T* ptr; T* ptr;
...@@ -321,22 +348,11 @@ public: ...@@ -321,22 +348,11 @@ public:
// If `timeout` is specified, then after the given amount of time, the callback will be called // If `timeout` is specified, then after the given amount of time, the callback will be called
// regardless of whether the condition is true. In this case, when `callback()` is called, // regardless of whether the condition is true. In this case, when `callback()` is called,
// `condition()` may in fact evaluate false, but *only* if the timeout was reached. // `condition()` may in fact evaluate false, but *only* if the timeout was reached.
//
// TODO(cleanup): lock->wait() is a better interface. Can we deprecate this one?
struct PredicateImpl final: public _::Mutex::Predicate { auto lock = lockExclusive();
bool check() override { lock.wait(kj::fwd<Cond>(condition), timeout);
return condition(value);
}
Cond&& condition;
const T& value;
PredicateImpl(Cond&& condition, const T& value)
: condition(kj::fwd<Cond>(condition)), value(value) {}
};
PredicateImpl impl(kj::fwd<Cond>(condition), value);
mutex.lockWhen(impl, timeout);
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
return callback(value); return callback(value);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment