Commit e11034dd authored by Kenton Varda's avatar Kenton Varda

When MutexGuarded::when()'s wait condition throws, propagate to caller.

parent ab033b7f
...@@ -183,6 +183,48 @@ TEST(Mutex, When) { ...@@ -183,6 +183,48 @@ TEST(Mutex, When) {
KJ_EXPECT(*value.lockShared() == 101); KJ_EXPECT(*value.lockShared() == 101);
} }
#if !KJ_NO_EXCEPTIONS
{
// Throw from predicate.
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_FAIL_ASSERT("oops threw");
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}));
// Throw from predicate later on.
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_ASSERT(n != 321, "oops threw");
return false;
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}));
}
{
// Verify the exceptions didn't break the mutex.
uint m = value.when([](uint n) { return n > 0; }, [](uint& n) {
return n;
});
KJ_EXPECT(m == 321);
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 654;
});
m = value.when([](uint n) { return n > 500; }, [](uint& n) {
return n;
});
KJ_EXPECT(m == 654);
}
#endif
} }
TEST(Mutex, WhenWithTimeout) { TEST(Mutex, WhenWithTimeout) {
...@@ -260,6 +302,55 @@ TEST(Mutex, WhenWithTimeout) { ...@@ -260,6 +302,55 @@ TEST(Mutex, WhenWithTimeout) {
}, LONG_TIMEOUT); }, LONG_TIMEOUT);
KJ_EXPECT(m == 56); KJ_EXPECT(m == 56);
} }
#if !KJ_NO_EXCEPTIONS
{
// Throw from predicate.
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_FAIL_ASSERT("oops threw");
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}, LONG_TIMEOUT));
// Throw from predicate later on.
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_ASSERT(n != 321, "oops threw");
return false;
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}, LONG_TIMEOUT));
}
{
// Verify the exceptions didn't break the mutex.
uint m = value.when([](uint n) { return n > 0; }, [](uint& n) {
return n;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 321);
auto start = now();
m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_EXPECT(now() - start >= 10 * kj::MILLISECONDS);
return n + 1;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 322);
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 654;
});
m = value.when([](uint n) { return n > 500; }, [](uint& n) {
return n;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 654);
}
#endif
} }
TEST(Mutex, Lazy) { TEST(Mutex, Lazy) {
......
...@@ -71,13 +71,34 @@ inline void Mutex::removeWaiter(Waiter& waiter) { ...@@ -71,13 +71,34 @@ inline void Mutex::removeWaiter(Waiter& waiter) {
} }
} }
bool Mutex::checkPredicate(Waiter& waiter) {
// Run the predicate from a thread other than the waiting thread, returning true if it's time to
// signal the waiting thread. This is not only when the predicate passes, but also when it
// throws, in which case we want to propagate the exception to the waiting thread.
if (waiter.exception != nullptr) return true; // don't run again after an exception
bool result = false;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
result = waiter.predicate.check();
})) {
// Exception thown.
result = true;
waiter.exception = kj::heap(kj::mv(*exception));
};
return result;
}
#if !_WIN32 #if !_WIN32
namespace { namespace {
TimePoint toTimePoint(struct timespec ts) {
return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS;
}
TimePoint now() { TimePoint now() {
struct timespec now; struct timespec now;
KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now)); KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
return kj::origin<TimePoint>() + now.tv_sec * kj::SECONDS + now.tv_nsec * kj::NANOSECONDS; return toTimePoint(now);
} }
struct timespec toRelativeTimespec(Duration timeout) { struct timespec toRelativeTimespec(Duration timeout) {
struct timespec ts; struct timespec ts;
...@@ -157,7 +178,7 @@ void Mutex::unlock(Exclusivity exclusivity) { ...@@ -157,7 +178,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (waiter->predicate.check()) { if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. // This waiter's predicate now evaluates true, so wake it up.
if (waiter->hasTimeout) { if (waiter->hasTimeout) {
// In this case we need to be careful to make sure the target thread isn't already // In this case we need to be careful to make sure the target thread isn't already
...@@ -249,7 +270,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -249,7 +270,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
}); });
// Add waiter to list. // Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, 0, timeout != nullptr }; Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
addWaiter(waiter); addWaiter(waiter);
KJ_DEFER(removeWaiter(waiter)); KJ_DEFER(removeWaiter(waiter));
...@@ -301,7 +322,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -301,7 +322,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error); KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
} }
if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) != 0) { if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) {
// We received a lock ownership transfer from another thread. // We received a lock ownership transfer from another thread.
currentlyLocked = true; currentlyLocked = true;
...@@ -309,6 +330,16 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -309,6 +330,16 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
#ifdef KJ_DEBUG #ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE); assertLockedByCaller(EXCLUSIVE);
#endif #endif
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
return; return;
} }
} }
...@@ -411,7 +442,7 @@ void Mutex::unlock(Exclusivity exclusivity) { ...@@ -411,7 +442,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (waiter->predicate.check()) { if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use Wake vs. WakeAll here since there's always only one thread waiting. // use Wake vs. WakeAll here since there's always only one thread waiting.
WakeConditionVariable(&coercedCondvar(waiter->condvar)); WakeConditionVariable(&coercedCondvar(waiter->condvar));
...@@ -448,8 +479,11 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) { ...@@ -448,8 +479,11 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE); lock(EXCLUSIVE);
// Any exceptions should leave the mutex unlocked.
KJ_ON_SCOPE_FAILURE(unlock(EXCLUSIVE));
// Add waiter to list. // Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate }; Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE), static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
"CONDITION_VARIABLE is not a pointer?"); "CONDITION_VARIABLE is not a pointer?");
InitializeConditionVariable(&coercedCondvar(waiter.condvar)); InitializeConditionVariable(&coercedCondvar(waiter.condvar));
...@@ -478,6 +512,15 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -478,6 +512,15 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
KJ_FAIL_WIN32("SleepConditionVariableSRW()", error); KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
} }
} }
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
} }
} }
...@@ -566,7 +609,7 @@ void Mutex::unlock(Exclusivity exclusivity) { ...@@ -566,7 +609,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) { KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next; nextWaiter = waiter->next;
if (waiter->predicate.check()) { if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we // This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use _signal() vs. _broadcast() here since there's always only one thread waiting. // use _signal() vs. _broadcast() here since there's always only one thread waiting.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex)); KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
...@@ -612,9 +655,16 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) { ...@@ -612,9 +655,16 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE); lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list. // Add waiter to list.
Waiter waiter { Waiter waiter {
nullptr, waitersTail, predicate, nullptr, waitersTail, predicate, nullptr,
PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
}; };
...@@ -653,6 +703,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -653,6 +703,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
// OK, now we can unlock the main mutex. // OK, now we can unlock the main mutex.
unlock(EXCLUSIVE); unlock(EXCLUSIVE);
currentlyLocked = false;
bool timedOut = false; bool timedOut = false;
...@@ -663,8 +714,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -663,8 +714,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
// which means modifying the system clock will break the wait. However, macOS happens to // which means modifying the system clock will break the wait. However, macOS happens to
// provide an alternative relative-time wait function, so I guess we'll use that. It does // provide an alternative relative-time wait function, so I guess we'll use that. It does
// require recomputing the time every iteration... // require recomputing the time every iteration...
struct timespec ts = toRelativeTimespec( struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS));
kj::max(toAbsoluteTimespec(*t) - now(), 0 * kj::SECONDS));
int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts); int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
#else #else
int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t); int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
...@@ -688,6 +738,16 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) { ...@@ -688,6 +738,16 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex)); KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
lock(EXCLUSIVE); lock(EXCLUSIVE);
currentlyLocked = true;
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
if (timedOut) { if (timedOut) {
return; return;
......
...@@ -41,6 +41,8 @@ ...@@ -41,6 +41,8 @@
namespace kj { namespace kj {
class Exception;
// ======================================================================================= // =======================================================================================
// Private details -- public interfaces follow below. // Private details -- public interfaces follow below.
...@@ -101,6 +103,7 @@ private: ...@@ -101,6 +103,7 @@ private:
kj::Maybe<Waiter&> next; kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev; kj::Maybe<Waiter&>* prev;
Predicate& predicate; Predicate& predicate;
Maybe<Own<Exception>> exception;
#if KJ_USE_FUTEX #if KJ_USE_FUTEX
uint futex; uint futex;
bool hasTimeout; bool hasTimeout;
...@@ -122,6 +125,7 @@ private: ...@@ -122,6 +125,7 @@ private:
inline void addWaiter(Waiter& waiter); inline void addWaiter(Waiter& waiter);
inline void removeWaiter(Waiter& waiter); inline void removeWaiter(Waiter& waiter);
bool checkPredicate(Waiter& waiter);
}; };
class Once { class Once {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment