Unverified Commit dab11026 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #839 from capnproto/portable-condvar

Implement MutexGuarded::when() (i.e. condvars) on all platforms.
parents 1560b9a5 4312f987
......@@ -19,6 +19,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#define WINVER 0x0600
#define _WIN32_WINNT 0x0600
#define NOGDI // NOGDI is needed to make EXPECT_EQ(123u, *lock) compile for some reason
#endif
#define KJ_MUTEX_TEST 1
#include "mutex.h"
#include "debug.h"
#include "thread.h"
......@@ -26,12 +35,12 @@
#include <stdlib.h>
#if _WIN32
#define NOGDI // NOGDI is needed to make EXPECT_EQ(123u, *lock) compile for some reason
#include <windows.h>
#undef NOGDI
#else
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#endif
namespace kj {
......@@ -39,8 +48,32 @@ namespace {
#if _WIN32
inline void delay() { Sleep(10); }
LARGE_INTEGER qpcBase;
LARGE_INTEGER qpcFreq;
bool qpcInitialized = false;
TimePoint now() {
// Use our own time origin so that QPC values are small and don't overflow when we multiply by
// 1000000.
if (!qpcInitialized) {
QueryPerformanceCounter(&qpcBase);
QueryPerformanceFrequency(&qpcFreq);
qpcInitialized = true;
}
LARGE_INTEGER qpc;
QueryPerformanceCounter(&qpc);
uint64_t micros = (qpc.QuadPart - qpcBase.QuadPart) * 1000000 / qpcFreq.QuadPart;
return kj::origin<TimePoint>() + micros * kj::MICROSECONDS;
}
#else
inline void delay() { usleep(10000); }
TimePoint now() {
struct timespec now;
KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
return kj::origin<TimePoint>() + now.tv_sec * kj::SECONDS + now.tv_nsec * kj::NANOSECONDS;
}
#endif
TEST(Mutex, MutexGuarded) {
......@@ -119,7 +152,6 @@ TEST(Mutex, MutexGuarded) {
EXPECT_EQ(321u, value.getWithoutLock());
}
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
TEST(Mutex, When) {
MutexGuarded<uint> value(123);
......@@ -169,8 +201,232 @@ TEST(Mutex, When) {
KJ_EXPECT(*value.lockShared() == 101);
}
#if !KJ_NO_EXCEPTIONS
{
// Throw from predicate.
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_FAIL_ASSERT("oops threw");
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}));
// Throw from predicate later on.
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_ASSERT(n != 321, "oops threw");
return false;
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}));
}
{
// Verify the exceptions didn't break the mutex.
uint m = value.when([](uint n) { return n > 0; }, [](uint& n) {
return n;
});
KJ_EXPECT(m == 321);
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 654;
});
m = value.when([](uint n) { return n > 500; }, [](uint& n) {
return n;
});
KJ_EXPECT(m == 654);
}
#endif
}
TEST(Mutex, WhenWithTimeout) {
MutexGuarded<uint> value(123);
// A timeout that won't expire.
static constexpr Duration LONG_TIMEOUT = 10 * kj::SECONDS;
{
uint m = value.when([](uint n) { return n < 200; }, [](uint& n) {
++n;
return n + 2;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 126);
KJ_EXPECT(*value.lockShared() == 124);
}
{
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
uint m = value.when([](uint n) { return n > 200; }, [](uint& n) {
++n;
return n + 2;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 324);
KJ_EXPECT(*value.lockShared() == 322);
}
{
// Stress test. 100 threads each wait for a value and then set the next value.
*value.lockExclusive() = 0;
auto threads = kj::heapArrayBuilder<kj::Own<kj::Thread>>(100);
for (auto i: kj::zeroTo(100)) {
threads.add(kj::heap<kj::Thread>([i,&value]() {
if (i % 2 == 0) delay();
uint m = value.when([i](const uint& n) { return n == i; },
[](uint& n) { return n++; }, LONG_TIMEOUT);
KJ_ASSERT(m == i);
}));
}
uint m = value.when([](uint n) { return n == 100; }, [](uint& n) {
return n++;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 100);
KJ_EXPECT(*value.lockShared() == 101);
}
{
auto start = now();
uint m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
KJ_EXPECT(now() - start >= 10 * kj::MILLISECONDS);
return 12;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 12);
m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
KJ_EXPECT(now() - start >= 20 * kj::MILLISECONDS);
return 34;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 34);
m = value.when([](uint n) { return n > 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
return 56;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 56);
}
#if !KJ_NO_EXCEPTIONS
{
// Throw from predicate.
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_FAIL_ASSERT("oops threw");
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}, LONG_TIMEOUT));
// Throw from predicate later on.
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
KJ_EXPECT_THROW_MESSAGE("oops threw", value.when([](uint n) -> bool {
KJ_ASSERT(n != 321, "oops threw");
return false;
}, [](uint& n) {
KJ_FAIL_EXPECT("shouldn't get here");
}, LONG_TIMEOUT));
}
{
// Verify the exceptions didn't break the mutex.
uint m = value.when([](uint n) { return n > 0; }, [](uint& n) {
return n;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 321);
auto start = now();
m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_EXPECT(now() - start >= 10 * kj::MILLISECONDS);
return n + 1;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 322);
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 654;
});
m = value.when([](uint n) { return n > 500; }, [](uint& n) {
return n;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 654);
}
#endif
}
TEST(Mutex, WhenWithTimeoutPreciseTiming) {
// Test that MutexGuarded::when() with a timeout sleeps for precisely the right amount of time.
for (uint retryCount = 0; retryCount < 20; retryCount++) {
MutexGuarded<uint> value(123);
auto start = now();
uint m = value.when([&value](uint n) {
// HACK: Reset the value as a way of testing what happens when the waiting thread is woken
// up but then finds it's not ready yet.
value.getWithoutLock() = 123;
return n == 321;
}, [](uint& n) {
return 456;
}, 20 * kj::MILLISECONDS);
KJ_EXPECT(m == 456);
auto t = now() - start;
KJ_EXPECT(t >= 20 * kj::MILLISECONDS);
if (t <= 22 * kj::MILLISECONDS) {
return;
}
}
KJ_FAIL_ASSERT("time not within expected bounds even after retries");
}
TEST(Mutex, WhenWithTimeoutPreciseTimingAfterInterrupt) {
// Test that MutexGuarded::when() with a timeout sleeps for precisely the right amount of time,
// even if the thread is spuriously woken in the middle.
for (uint retryCount = 0; retryCount < 20; retryCount++) {
MutexGuarded<uint> value(123);
kj::Thread thread([&]() {
delay();
value.lockExclusive().induceSpuriousWakeupForTest();
});
auto start = now();
uint m = value.when([](uint n) {
return n == 321;
}, [](uint& n) {
return 456;
}, 20 * kj::MILLISECONDS);
KJ_EXPECT(m == 456);
auto t = now() - start;
KJ_EXPECT(t >= 20 * kj::MILLISECONDS, t / kj::MILLISECONDS);
if (t <= 22 * kj::MILLISECONDS) {
return;
}
}
KJ_FAIL_ASSERT("time not within expected bounds even after retries");
}
TEST(Mutex, Lazy) {
Lazy<uint> lazy;
......
......@@ -28,6 +28,11 @@
#include "mutex.h"
#include "debug.h"
#if !_WIN32
#include <time.h>
#include <errno.h>
#endif
#if KJ_USE_FUTEX
#include <unistd.h>
#include <sys/syscall.h>
......@@ -52,6 +57,68 @@
namespace kj {
namespace _ { // private
inline void Mutex::addWaiter(Waiter& waiter) {
#ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE);
#endif
*waitersTail = waiter;
waitersTail = &waiter.next;
}
inline void Mutex::removeWaiter(Waiter& waiter) {
#ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE);
#endif
*waiter.prev = waiter.next;
KJ_IF_MAYBE(next, waiter.next) {
next->prev = waiter.prev;
} else {
KJ_DASSERT(waitersTail == &waiter.next);
waitersTail = waiter.prev;
}
}
bool Mutex::checkPredicate(Waiter& waiter) {
// Run the predicate from a thread other than the waiting thread, returning true if it's time to
// signal the waiting thread. This is not only when the predicate passes, but also when it
// throws, in which case we want to propagate the exception to the waiting thread.
if (waiter.exception != nullptr) return true; // don't run again after an exception
bool result = false;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
result = waiter.predicate.check();
})) {
// Exception thown.
result = true;
waiter.exception = kj::heap(kj::mv(*exception));
};
return result;
}
#if !_WIN32
namespace {
TimePoint toTimePoint(struct timespec ts) {
return kj::origin<TimePoint>() + ts.tv_sec * kj::SECONDS + ts.tv_nsec * kj::NANOSECONDS;
}
TimePoint now() {
struct timespec now;
KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
return toTimePoint(now);
}
struct timespec toRelativeTimespec(Duration timeout) {
struct timespec ts;
ts.tv_sec = timeout / kj::SECONDS;
ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS;
return ts;
}
struct timespec toAbsoluteTimespec(TimePoint time) {
return toRelativeTimespec(time - kj::origin<TimePoint>());
}
} // namespace
#endif
#if KJ_USE_FUTEX
// =======================================================================================
// Futex-based implementation (Linux-only)
......@@ -84,7 +151,7 @@ void Mutex::lock(Exclusivity exclusivity) {
state |= EXCLUSIVE_REQUESTED;
}
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
}
break;
case SHARED: {
......@@ -97,7 +164,7 @@ void Mutex::lock(Exclusivity exclusivity) {
// The mutex is exclusively locked by another thread. Since we incremented the counter
// already, we just have to wait for it to be unlocked.
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, nullptr, nullptr, 0);
state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
}
break;
......@@ -105,13 +172,6 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
struct Mutex::Waiter {
kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev;
Predicate& predicate;
uint futex;
};
void Mutex::unlock(Exclusivity exclusivity) {
switch (exclusivity) {
case EXCLUSIVE: {
......@@ -124,10 +184,31 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (waiter->predicate.check()) {
if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up.
__atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
if (waiter->hasTimeout) {
// In this case we need to be careful to make sure the target thread isn't already
// processing a timeout, so we need to do an atomic CAS rather than just a store.
uint expected = 0;
if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false,
__ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
// Good, we set it to 1, transferring ownership of the mutex. Continue on below.
} else {
// Looks like the thread already timed out and set its own futex to 1. In that
// case it is going to try to lock the mutex itself, so we should NOT attempt an
// ownership transfer as this will deadlock.
//
// We have two options here: We can continue along the waiter list looking for
// another waiter that's ready to be signaled, or we could drop out of the list
// immediately since we know that another thread is already waiting for the lock
// and will re-evaluate the waiter queue itself when it is done. It feels cleaner
// to me to continue.
continue;
}
} else {
__atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
}
syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
// We transferred ownership of the lock to this waiter, so we're done now.
return;
......@@ -147,7 +228,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
// the lock, and we must wake them up. If there are any exclusive waiters, we must wake
// them up even if readers are waiting so that at the very least they may re-establish the
// EXCLUSIVE_REQUESTED bit that we just removed.
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
break;
}
......@@ -163,7 +244,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
&futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
// Wake all exclusive waiters. We have to wake all of them because one of them will
// grab the lock while the others will re-establish the exclusive-requested bit.
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
}
break;
......@@ -184,37 +265,107 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, 0 };
*waitersTail = waiter;
waitersTail = &waiter.next;
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
addWaiter(waiter);
KJ_DEFER({
// Remove from list.
*waiter.prev = waiter.next;
KJ_IF_MAYBE(next, waiter.next) {
next->prev = waiter.prev;
} else {
KJ_DASSERT(waitersTail == &waiter.next);
waitersTail = waiter.prev;
}
if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
});
if (!predicate.check()) {
unlock(EXCLUSIVE);
currentlyLocked = false;
// Wait for someone to set out futex to 1.
while (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) == 0) {
syscall(SYS_futex, &waiter.futex, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
struct timespec ts;
struct timespec* tsp = nullptr;
KJ_IF_MAYBE(t, timeout) {
ts = toAbsoluteTimespec(now() + *t);
tsp = &ts;
}
// Ownership of an exclusive lock was transferred to us. We can continue.
// Wait for someone to set our futex to 1.
for (;;) {
// Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as
// FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on
// CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing
// us to recompute the time after every iteration.
KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex,
&waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, nullptr, FUTEX_BITSET_MATCH_ANY)) {
case EAGAIN:
// Indicates that the futex was already non-zero by the time the kernal looked at it.
// Not an error.
break;
case ETIMEDOUT: {
// Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not
// transferred to us from another thread. So, we need to lock it ourselves. But, another
// thread might be in the process of signaling us and transferring ownership. So, we
// first must atomically take control of our destiny.
KJ_ASSERT(timeout != nullptr);
uint expected = 0;
if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false,
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// OK, we set our own futex to 1. That means no other thread will, and so we won't be
// receiving a mutex ownership transfer. We have to lock the mutex ourselves.
lock(EXCLUSIVE);
currentlyLocked = true;
return;
} else {
// Oh, someone else actually did signal us, apparently. Let's move on as if the futex
// call told us so.
break;
}
}
default:
KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
}
if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE)) {
// We received a lock ownership transfer from another thread.
currentlyLocked = true;
// The other thread checked the predicate before the transfer.
#ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE);
assertLockedByCaller(EXCLUSIVE);
#endif
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
return;
}
}
}
}
void Mutex::induceSpuriousWakeupForTest() {
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
} else {
// No more waiters.
break;
}
}
}
......@@ -230,7 +381,7 @@ startOver:
if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) ==
INITIALIZING_WITH_WAITERS) {
// Someone was waiting for us to finish.
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
});
......@@ -239,7 +390,7 @@ startOver:
if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) ==
INITIALIZING_WITH_WAITERS) {
// Someone was waiting for us to finish.
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
} else {
for (;;) {
......@@ -257,7 +408,8 @@ startOver:
}
// Wait for initialization.
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS, NULL, NULL, 0);
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS,
nullptr, nullptr, 0);
state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
if (state == UNINITIALIZED) {
......@@ -283,6 +435,7 @@ void Once::reset() {
#define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock))
#define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce))
#define coercedCondvar(var) (*reinterpret_cast<CONDITION_VARIABLE*>(&var))
Mutex::Mutex() {
static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?");
......@@ -303,9 +456,37 @@ void Mutex::lock(Exclusivity exclusivity) {
void Mutex::unlock(Exclusivity exclusivity) {
switch (exclusivity) {
case EXCLUSIVE:
ReleaseSRWLockExclusive(&coercedSrwLock);
case EXCLUSIVE: {
KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
// Check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use Wake vs. WakeAll here since there's always only one thread waiting.
WakeConditionVariable(&coercedCondvar(waiter->condvar));
// We only need to wake one waiter. Note that unlike the futex-based implementation, we
// cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
// that the condition is still true when that waiter finally awakes. However, if the
// condition is no longer true at that point, the waiter will re-check all other
// waiters' conditions and possibly wake up any other waiter who is now ready, hence we
// still only need to wake one waiter here.
return;
}
} else {
// No more waiters.
break;
}
}
break;
}
case SHARED:
ReleaseSRWLockShared(&coercedSrwLock);
break;
......@@ -319,6 +500,110 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
// held for debug purposes anyway, we just don't bother.
}
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Any exceptions should leave the mutex unlocked.
KJ_ON_SCOPE_FAILURE(unlock(EXCLUSIVE));
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
"CONDITION_VARIABLE is not a pointer?");
InitializeConditionVariable(&coercedCondvar(waiter.condvar));
addWaiter(waiter);
KJ_DEFER(removeWaiter(waiter));
DWORD sleepMs;
// Only initialized if `timeout` is non-null.
LARGE_INTEGER frequency;
LARGE_INTEGER endTime;
KJ_IF_MAYBE(t, timeout) {
// Windows sleeps are inaccurate -- they can be longer *or shorter* than the requested amount.
// For many use cases of our API, a too-short sleep would be unacceptable. Experimentally, it
// seems like sleeps can be up to half a millisecond short, so we'll add half a millisecond
// (and then we round up, below).
*t += 500 * kj::MICROSECONDS;
// Compute initial sleep time.
sleepMs = *t / kj::MILLISECONDS;
if (*t % kj::MILLISECONDS > 0 * kj::SECONDS) {
// We guarantee we won't wake up too early.
++sleepMs;
}
// Also compute the timeout absolute time in Performance Counter ticks, in case we need to
// restart the wait later.
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&endTime);
auto numerator = *t / kj::MILLISECONDS * frequency.QuadPart;
endTime.QuadPart += numerator / 1000;
if (numerator % 1000 > 0) {
// We guarantee we won't wake up too early.
++endTime.QuadPart;
}
} else {
sleepMs = INFINITE;
}
while (!predicate.check()) {
if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
// Normal result. Continue loop to check predicate.
} else {
DWORD error = GetLastError();
if (error == ERROR_TIMEOUT) {
// Timed out. Skip predicate check.
return;
} else {
KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
}
}
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
// Recompute sleep time.
if (timeout != nullptr) {
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
if (endTime.QuadPart > now.QuadPart) {
uint64_t numerator = (endTime.QuadPart - now.QuadPart) * 1000;
sleepMs = numerator / frequency.QuadPart;
if (numerator % frequency.QuadPart > 0) {
// We guarantee we won't wake up too early.
++sleepMs;
}
} else {
// Oops, already timed out.
return;
}
}
}
}
void Mutex::induceSpuriousWakeupForTest() {
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
WakeConditionVariable(&coercedCondvar(waiter->condvar));
} else {
// No more waiters.
break;
}
}
}
static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) {
return true;
}
......@@ -377,9 +662,7 @@ void Once::reset() {
} \
}
Mutex::Mutex() {
KJ_PTHREAD_CALL(pthread_rwlock_init(&mutex, nullptr));
}
Mutex::Mutex(): mutex(PTHREAD_RWLOCK_INITIALIZER) {}
Mutex::~Mutex() {
KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
}
......@@ -396,7 +679,37 @@ void Mutex::lock(Exclusivity exclusivity) {
}
void Mutex::unlock(Exclusivity exclusivity) {
KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex));
KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
if (exclusivity == EXCLUSIVE) {
// Check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use _signal() vs. _broadcast() here since there's always only one thread waiting.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
// We only need to wake one waiter. Note that unlike the futex-based implementation, we
// cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
// that the condition is still true when that waiter finally awakes. However, if the
// condition is no longer true at that point, the waiter will re-check all other waiters'
// conditions and possibly wake up any other waiter who is now ready, hence we still only
// need to wake one waiter here.
break;
}
} else {
// No more waiters.
break;
}
}
}
}
void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
......@@ -419,9 +732,129 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
Once::Once(bool startInitialized): state(startInitialized ? INITIALIZED : UNINITIALIZED) {
KJ_PTHREAD_CALL(pthread_mutex_init(&mutex, nullptr));
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list.
Waiter waiter {
nullptr, waitersTail, predicate, nullptr,
PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
};
addWaiter(waiter);
KJ_DEFER({
if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
// Destroy pthread objects.
KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar));
});
#if !__APPLE__
if (timeout != nullptr) {
// Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic
// clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's
// a bizarre pthread_cond_timedwait_relative_np() method we can use instead...)
pthread_condattr_t attr;
KJ_PTHREAD_CALL(pthread_condattr_init(&attr));
KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
pthread_cond_init(&waiter.condvar, &attr);
KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr));
}
#endif
Maybe<struct timespec> endTime = timeout.map([](Duration d) {
return toAbsoluteTimespec(now() + d);
});
while (!predicate.check()) {
// pthread condvars only work with basic mutexes, not rwlocks. So, we need to lock a basic
// mutex before we unlock the real mutex, and the signaling thread also needs to lock this
// mutex, in order to ensure that this thread is actually waiting on the condvar before it is
// signaled.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
// OK, now we can unlock the main mutex.
unlock(EXCLUSIVE);
currentlyLocked = false;
bool timedOut = false;
// Wait for someone to signal the condvar.
KJ_IF_MAYBE(t, endTime) {
#if __APPLE__
// On macOS, the absolute timeout can only be specified in wall time, not monotonic time,
// which means modifying the system clock will break the wait. However, macOS happens to
// provide an alternative relative-time wait function, so I guess we'll use that. It does
// require recomputing the time every iteration...
struct timespec ts = toRelativeTimespec(kj::max(toTimePoint(*t) - now(), 0 * kj::SECONDS));
int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
#else
int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
#endif
if (error != 0) {
if (error == ETIMEDOUT) {
timedOut = true;
} else {
KJ_FAIL_SYSCALL("pthread_cond_timedwait", error);
}
}
} else {
KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
}
// We have to be very careful about lock ordering here. We need to unlock stupidMutex before
// re-locking the main mutex, because another thread may have a lock on the main mutex already
// and be waiting for a lock on stupidMutex. Note that other thread may signal the condvar
// right after we unlock stupidMutex but before we re-lock the main mutex. That is fine,
// because we've already been signaled.
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
lock(EXCLUSIVE);
currentlyLocked = true;
KJ_IF_MAYBE(exception, waiter.exception) {
// The predicate threw an exception, apparently. Propagate it.
// TODO(someday): Could we somehow have this be a recoverable exception? Presumably we'd
// then want MutexGuarded::when() to skip calling the callback, but then what should it
// return, since it normally returns the callback's result? Or maybe people who disable
// exceptions just really should not write predicates that can throw.
kj::throwFatalException(kj::mv(**exception));
}
if (timedOut) {
return;
}
}
}
void Mutex::induceSpuriousWakeupForTest() {
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
KJ_PTHREAD_CALL(pthread_cond_signal(&waiter->condvar));
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter->stupidMutex));
} else {
// No more waiters.
break;
}
}
}
Once::Once(bool startInitialized)
: state(startInitialized ? INITIALIZED : UNINITIALIZED),
mutex(PTHREAD_MUTEX_INITIALIZER) {}
Once::~Once() {
KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex));
}
......
......@@ -27,6 +27,7 @@
#include "memory.h"
#include <inttypes.h>
#include "time.h"
#if __linux__ && !defined(KJ_USE_FUTEX)
#define KJ_USE_FUTEX 1
......@@ -40,6 +41,8 @@
namespace kj {
class Exception;
// =======================================================================================
// Private details -- public interfaces follow below.
......@@ -66,15 +69,20 @@ public:
// non-trivial, assert that the mutex is locked (which should be good enough to catch problems
// in unit tests). In non-debug builds, do nothing.
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
class Predicate {
public:
virtual bool check() = 0;
};
void lockWhen(Predicate& predicate);
// Lock (exclusively) when predicate.check() returns true.
#endif
void lockWhen(Predicate& predicate, Maybe<Duration> timeout = nullptr);
// Lock (exclusively) when predicate.check() returns true, or when the timeout (if any) expires.
// The mutex is always locked when this returns regardless of whether the timeout expired (and
// always unlocked if it throws).
void induceSpuriousWakeupForTest();
// Utility method for mutex-test.c++ which causes a spurious thread wakeup on all threads that
// are waiting for a lockWhen() condition. Assuming correct implementation, all those threads
// should immediately go back to sleep.
private:
#if KJ_USE_FUTEX
......@@ -89,17 +97,40 @@ private:
static constexpr uint EXCLUSIVE_REQUESTED = 1u << 30;
static constexpr uint SHARED_COUNT_MASK = EXCLUSIVE_REQUESTED - 1;
struct Waiter;
kj::Maybe<Waiter&> waitersHead = nullptr;
kj::Maybe<Waiter&>* waitersTail = &waitersHead;
// linked list of waitUntil()s; can only modify under lock
#elif _WIN32
uintptr_t srwLock; // Actually an SRWLOCK, but don't want to #include <windows.h> in header.
#else
mutable pthread_rwlock_t mutex;
#endif
struct Waiter {
kj::Maybe<Waiter&> next;
kj::Maybe<Waiter&>* prev;
Predicate& predicate;
Maybe<Own<Exception>> exception;
#if KJ_USE_FUTEX
uint futex;
bool hasTimeout;
#elif _WIN32
uintptr_t condvar;
// Actually CONDITION_VARIABLE, but don't want to #include <windows.h> in header.
#else
pthread_cond_t condvar;
pthread_mutex_t stupidMutex;
// pthread condvars are only compatible with basic pthread mutexes, not rwlocks, for no
// particularly good reason. To work around this, we need an extra mutex per condvar.
#endif
};
kj::Maybe<Waiter&> waitersHead = nullptr;
kj::Maybe<Waiter&>* waitersTail = &waitersHead;
// linked list of waitUntil()s; can only modify under lock
inline void addWaiter(Waiter& waiter);
inline void removeWaiter(Waiter& waiter);
bool checkPredicate(Waiter& waiter);
};
class Once {
......@@ -220,6 +251,14 @@ private:
friend class MutexGuarded;
template <typename U>
friend class ExternalMutexGuarded;
#if KJ_MUTEX_TEST
public:
#endif
void induceSpuriousWakeupForTest() { mutex->induceSpuriousWakeupForTest(); }
// Utility method for mutex-test.c++ which causes a spurious thread wakeup on all threads that
// are waiting for a when() condition. Assuming correct implementation, all those threads should
// immediately go back to sleep.
};
template <typename T>
......@@ -265,19 +304,23 @@ public:
inline T& getAlreadyLockedExclusive() const;
// Like `getWithoutLock()`, but asserts that the lock is already held by the calling thread.
#if KJ_USE_FUTEX // TODO(someday): Implement on pthread & win32
template <typename Cond, typename Func>
auto when(Cond&& condition, Func&& callback) const -> decltype(callback(instance<T&>())) {
auto when(Cond&& condition, Func&& callback, Maybe<Duration> timeout = nullptr) const
-> decltype(callback(instance<T&>())) {
// Waits until condition(state) returns true, then calls callback(state) under lock.
//
// `condition`, when called, receives as its parameter a const reference to the state, which is
// locked (either shared or exclusive). `callback` returns a mutable reference, which is
// locked (either shared or exclusive). `callback` receives a mutable reference, which is
// exclusively locked.
//
// `condition()` may be called multiple times, from multiple threads, while waiting for the
// condition to become true. It may even return true once, but then be called more times.
// It is guaranteed, though, that at the time `callback()` is finally called, `condition()`
// would currently return true (assuming it is a pure function of the guarded data).
//
// If `timeout` is specified, then after the given amount of time, the callback will be called
// regardless of whether the condition is true. In this case, when `callback()` is called,
// `condition()` may in fact evaluate false, but *only* if the timeout was reached.
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
......@@ -292,11 +335,10 @@ public:
};
PredicateImpl impl(kj::fwd<Cond>(condition), value);
mutex.lockWhen(impl);
mutex.lockWhen(impl, timeout);
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
return callback(value);
}
#endif
private:
mutable _::Mutex mutex;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment