Commit 786bcfee authored by Kenton Varda's avatar Kenton Varda

Fix deadlock in condvar wait with flapping predicates.

I observed the cygwin async-xthread-test getting deadlocked here, and noticed the bug. However, the predicate in question was not flappy, so this doesn't really fix async-xthread-test.
parent f91bed86
...@@ -570,5 +570,27 @@ KJ_TEST("ExternalMutexGuarded<T> destroy without release") { ...@@ -570,5 +570,27 @@ KJ_TEST("ExternalMutexGuarded<T> destroy without release") {
} }
} }
KJ_TEST("condvar wait with flapping predicate") {
// This used to deadlock under some implementations due to a wait() checking its own predicate
// as part of unlock()ing the mutex. Adding `waiterToSkip` fixed this (and also eliminated a
// redundant call to the predicate).
MutexGuarded<uint> guarded(0);
Thread thread([&]() {
delay();
*guarded.lockExclusive() = 1;
});
{
auto lock = guarded.lockExclusive();
bool flap = true;
lock.wait([&](uint i) {
flap = !flap;
return i == 1 || flap;
});
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -172,7 +172,7 @@ void Mutex::lock(Exclusivity exclusivity) { ...@@ -172,7 +172,7 @@ void Mutex::lock(Exclusivity exclusivity) {
} }
} }
void Mutex::unlock(Exclusivity exclusivity) { void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
switch (exclusivity) { switch (exclusivity) {
case EXCLUSIVE: { case EXCLUSIVE: {
KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked."); KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
...@@ -184,7 +184,7 @@ void Mutex::unlock(Exclusivity exclusivity) { ...@@ -184,7 +184,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (checkPredicate(*waiter)) { if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. // This waiter's predicate now evaluates true, so wake it up.
if (waiter->hasTimeout) { if (waiter->hasTimeout) {
// In this case we need to be careful to make sure the target thread isn't already // In this case we need to be careful to make sure the target thread isn't already
...@@ -279,7 +279,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -279,7 +279,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
}); });
if (!predicate.check()) { if (!predicate.check()) {
unlock(EXCLUSIVE); unlock(EXCLUSIVE, &waiter);
currentlyLocked = false; currentlyLocked = false;
struct timespec ts; struct timespec ts;
...@@ -447,7 +447,7 @@ void Mutex::lock(Exclusivity exclusivity) { ...@@ -447,7 +447,7 @@ void Mutex::lock(Exclusivity exclusivity) {
} }
} }
void Mutex::wakeReadyWaiter() { void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) {
// Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than // Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than
// one waiter because only one waiter could get the lock anyway, and once it releases that lock // one waiter because only one waiter could get the lock anyway, and once it releases that lock
// it will awake the next waiter if necessary. // it will awake the next waiter if necessary.
...@@ -457,7 +457,7 @@ void Mutex::wakeReadyWaiter() { ...@@ -457,7 +457,7 @@ void Mutex::wakeReadyWaiter() {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (checkPredicate(*waiter)) { if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use Wake vs. WakeAll here since there's always only one thread waiting. // use Wake vs. WakeAll here since there's always only one thread waiting.
WakeConditionVariable(&coercedCondvar(waiter->condvar)); WakeConditionVariable(&coercedCondvar(waiter->condvar));
...@@ -477,14 +477,14 @@ void Mutex::wakeReadyWaiter() { ...@@ -477,14 +477,14 @@ void Mutex::wakeReadyWaiter() {
} }
} }
void Mutex::unlock(Exclusivity exclusivity) { void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
switch (exclusivity) { switch (exclusivity) {
case EXCLUSIVE: { case EXCLUSIVE: {
KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock)); KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
// Check if there are any conditional waiters. Note we only do this when unlocking an // Check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed. // exclusive lock since under a shared lock the state couldn't have changed.
wakeReadyWaiter(); wakeReadyWaiter(waiterToSkip);
break; break;
} }
...@@ -540,7 +540,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -540,7 +540,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
while (!predicate.check()) { while (!predicate.check()) {
// SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other // SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other
// waiters that are now ready. // waiters that are now ready.
wakeReadyWaiter(); wakeReadyWaiter(&waiter);
if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) { if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
// Normal result. Continue loop to check predicate. // Normal result. Continue loop to check predicate.
...@@ -669,7 +669,7 @@ void Mutex::lock(Exclusivity exclusivity) { ...@@ -669,7 +669,7 @@ void Mutex::lock(Exclusivity exclusivity) {
} }
} }
void Mutex::unlock(Exclusivity exclusivity) { void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex))); KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
if (exclusivity == EXCLUSIVE) { if (exclusivity == EXCLUSIVE) {
...@@ -680,7 +680,7 @@ void Mutex::unlock(Exclusivity exclusivity) { ...@@ -680,7 +680,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (checkPredicate(*waiter)) { if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use _signal() vs. _broadcast() here since there's always only one thread waiting. // use _signal() vs. _broadcast() here since there's always only one thread waiting.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex)); KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
...@@ -768,7 +768,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -768,7 +768,7 @@ void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex)); KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
// OK, now we can unlock the main mutex. // OK, now we can unlock the main mutex.
unlock(EXCLUSIVE); unlock(EXCLUSIVE, &waiter);
currentlyLocked = false; currentlyLocked = false;
bool timedOut = false; bool timedOut = false;
......
...@@ -51,6 +51,8 @@ namespace _ { // private ...@@ -51,6 +51,8 @@ namespace _ { // private
class Mutex { class Mutex {
// Internal implementation details. See `MutexGuarded<T>`. // Internal implementation details. See `MutexGuarded<T>`.
struct Waiter;
public: public:
Mutex(); Mutex();
~Mutex(); ~Mutex();
...@@ -62,7 +64,7 @@ public: ...@@ -62,7 +64,7 @@ public:
}; };
void lock(Exclusivity exclusivity); void lock(Exclusivity exclusivity);
void unlock(Exclusivity exclusivity); void unlock(Exclusivity exclusivity, Waiter* waiterToSkip = nullptr);
void assertLockedByCaller(Exclusivity exclusivity); void assertLockedByCaller(Exclusivity exclusivity);
// In debug mode, assert that the mutex is locked by the calling thread, or if that is // In debug mode, assert that the mutex is locked by the calling thread, or if that is
...@@ -134,7 +136,7 @@ private: ...@@ -134,7 +136,7 @@ private:
inline void removeWaiter(Waiter& waiter); inline void removeWaiter(Waiter& waiter);
bool checkPredicate(Waiter& waiter); bool checkPredicate(Waiter& waiter);
#if _WIN32 #if _WIN32
void wakeReadyWaiter(); void wakeReadyWaiter(Waiter* waiterToSkip);
#endif #endif
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment