Commit ab033b7f authored by Kenton Varda's avatar Kenton Varda

Extend MutexGuarded::when() with support for timeouts.

parent ee08272c
......@@ -19,6 +19,13 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#define WINVER 0x0600
#define _WIN32_WINNT 0x0600
#define NOGDI // NOGDI is needed to make EXPECT_EQ(123u, *lock) compile for some reason
#endif
#include "mutex.h"
#include "debug.h"
#include "thread.h"
......@@ -26,12 +33,12 @@
#include <stdlib.h>
#if _WIN32
#define NOGDI // NOGDI is needed to make EXPECT_EQ(123u, *lock) compile for some reason
#include <windows.h>
#undef NOGDI
#else
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#endif
namespace kj {
......@@ -39,8 +46,16 @@ namespace {
#if _WIN32
inline void delay() { Sleep(10); }
TimePoint now() {
return kj::origin<TimePoint>() + GetTickCount64() * kj::MILLISECONDS;
}
#else
inline void delay() { usleep(10000); }
TimePoint now() {
struct timespec now;
KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
return kj::origin<TimePoint>() + now.tv_sec * kj::SECONDS + now.tv_nsec * kj::NANOSECONDS;
}
#endif
TEST(Mutex, MutexGuarded) {
......@@ -170,6 +185,83 @@ TEST(Mutex, When) {
}
}
TEST(Mutex, WhenWithTimeout) {
MutexGuarded<uint> value(123);
// A timeout that won't expire.
static constexpr Duration LONG_TIMEOUT = 10 * kj::SECONDS;
{
uint m = value.when([](uint n) { return n < 200; }, [](uint& n) {
++n;
return n + 2;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 126);
KJ_EXPECT(*value.lockShared() == 124);
}
{
kj::Thread thread([&]() {
delay();
*value.lockExclusive() = 321;
});
uint m = value.when([](uint n) { return n > 200; }, [](uint& n) {
++n;
return n + 2;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 324);
KJ_EXPECT(*value.lockShared() == 322);
}
{
// Stress test. 100 threads each wait for a value and then set the next value.
*value.lockExclusive() = 0;
auto threads = kj::heapArrayBuilder<kj::Own<kj::Thread>>(100);
for (auto i: kj::zeroTo(100)) {
threads.add(kj::heap<kj::Thread>([i,&value]() {
if (i % 2 == 0) delay();
uint m = value.when([i](const uint& n) { return n == i; },
[](uint& n) { return n++; }, LONG_TIMEOUT);
KJ_ASSERT(m == i);
}));
}
uint m = value.when([](uint n) { return n == 100; }, [](uint& n) {
return n++;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 100);
KJ_EXPECT(*value.lockShared() == 101);
}
{
auto start = now();
uint m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
KJ_EXPECT(now() - start >= 10 * kj::MILLISECONDS);
return 12;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 12);
m = value.when([](uint n) { return n == 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
KJ_EXPECT(now() - start >= 20 * kj::MILLISECONDS);
return 34;
}, 10 * kj::MILLISECONDS);
KJ_EXPECT(m == 34);
m = value.when([](uint n) { return n > 0; }, [&](uint& n) {
KJ_ASSERT(n == 101);
return 56;
}, LONG_TIMEOUT);
KJ_EXPECT(m == 56);
}
}
TEST(Mutex, Lazy) {
Lazy<uint> lazy;
volatile bool initStarted = false;
......
......@@ -28,6 +28,11 @@
#include "mutex.h"
#include "debug.h"
#if !_WIN32
#include <time.h>
#include <errno.h>
#endif
#if KJ_USE_FUTEX
#include <unistd.h>
#include <sys/syscall.h>
......@@ -66,6 +71,27 @@ inline void Mutex::removeWaiter(Waiter& waiter) {
}
}
#if !_WIN32
namespace {
TimePoint now() {
struct timespec now;
KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &now));
return kj::origin<TimePoint>() + now.tv_sec * kj::SECONDS + now.tv_nsec * kj::NANOSECONDS;
}
struct timespec toRelativeTimespec(Duration timeout) {
struct timespec ts;
ts.tv_sec = timeout / kj::SECONDS;
ts.tv_nsec = timeout % kj::SECONDS / kj::NANOSECONDS;
return ts;
}
struct timespec toAbsoluteTimespec(TimePoint time) {
return toRelativeTimespec(time - kj::origin<TimePoint>());
}
} // namespace
#endif
#if KJ_USE_FUTEX
// =======================================================================================
// Futex-based implementation (Linux-only)
......@@ -133,7 +159,28 @@ void Mutex::unlock(Exclusivity exclusivity) {
if (waiter->predicate.check()) {
// This waiter's predicate now evaluates true, so wake it up.
if (waiter->hasTimeout) {
// In this case we need to be careful to make sure the target thread isn't already
// processing a timeout, so we need to do an atomic CAS rather than just a store.
uint expected = 0;
if (__atomic_compare_exchange_n(&waiter->futex, &expected, 1, false,
__ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
// Good, we set it to 1, transferring ownership of the mutex. Continue on below.
} else {
// Looks like the thread already timed out and set its own futex to 1. In that
// case it is going to try to lock the mutex itself, so we should NOT attempt an
// ownership transfer as this will deadlock.
//
// We have two options here: We can continue along the waiter list looking for
// another waiter that's ready to be signaled, or we could drop out of the list
// immediately since we know that another thread is already waiting for the lock
// and will re-evaluate the waiter queue itself when it is done. It feels cleaner
// to me to continue.
continue;
}
} else {
__atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE);
}
syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
// We transferred ownership of the lock to this waiter, so we're done now.
......@@ -191,27 +238,80 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, 0 };
Waiter waiter { nullptr, waitersTail, predicate, 0, timeout != nullptr };
addWaiter(waiter);
KJ_DEFER(removeWaiter(waiter));
if (!predicate.check()) {
unlock(EXCLUSIVE);
currentlyLocked = false;
// Wait for someone to set out futex to 1.
while (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) == 0) {
syscall(SYS_futex, &waiter.futex, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
struct timespec ts;
struct timespec* tsp = nullptr;
KJ_IF_MAYBE(t, timeout) {
ts = toAbsoluteTimespec(now() + *t);
tsp = &ts;
}
// Wait for someone to set our futex to 1.
for (;;) {
// Note we use FUTEX_WAIT_BITSET_PRIVATE + FUTEX_BITSET_MATCH_ANY to get the same effect as
// FUTEX_WAIT_PRIVATE except that the timeout is specified as an absolute time based on
// CLOCK_MONOTONIC. Otherwise, FUTEX_WAIT_PRIVATE interprets it as a relative time, forcing
// us to recompute the time after every iteration.
KJ_SYSCALL_HANDLE_ERRORS(syscall(SYS_futex,
&waiter.futex, FUTEX_WAIT_BITSET_PRIVATE, 0, tsp, NULL, FUTEX_BITSET_MATCH_ANY)) {
case EAGAIN:
// Indicates that the futex was already non-zero by the time the kernal looked at it.
// Not an error.
break;
case ETIMEDOUT: {
// Wait timed out. This leaves us in a bit of a pickle: Ownership of the mutex was not
// transferred to us from another thread. So, we need to lock it ourselves. But, another
// thread might be in the process of signaling us and transferring ownership. So, we
// first must atomically take control of our destiny.
KJ_ASSERT(timeout != nullptr);
uint expected = 0;
if (__atomic_compare_exchange_n(&waiter.futex, &expected, 1, false,
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// OK, we set our own futex to 1. That means no other thread will, and so we won't be
// receiving a mutex ownership transfer. We have to lock the mutex ourselves.
lock(EXCLUSIVE);
currentlyLocked = true;
return;
} else {
// Oh, someone else actually did signal us, apparently. Let's move on as if the futex
// call told us so.
break;
}
}
default:
KJ_FAIL_SYSCALL("futex(FUTEX_WAIT_PRIVATE)", error);
}
// Ownership of an exclusive lock was transferred to us. We can continue.
if (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) != 0) {
// We received a lock ownership transfer from another thread.
currentlyLocked = true;
// The other thread checked the predicate before the transfer.
#ifdef KJ_DEBUG
assertLockedByCaller(EXCLUSIVE);
#endif
return;
}
}
}
}
......@@ -345,7 +445,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
// held for debug purposes anyway, we just don't bother.
}
void Mutex::lockWhen(Predicate& predicate) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Add waiter to list.
......@@ -357,12 +457,27 @@ void Mutex::lockWhen(Predicate& predicate) {
addWaiter(waiter);
KJ_DEFER(removeWaiter(waiter));
Maybe<TimePoint> endTime = timeout.map([](Duration d) {
return kj::origin<TimePoint>() + GetTickCount64() * kj::MILLISECONDS + d;
});
while (!predicate.check()) {
// SleepConditionVariableSRW(), unlike other modern Win32 synchronization functions, can fail.
// However, I'm guessing the only possible error is ERROR_TIMEOUT, which should never happen
// to us...
KJ_WIN32(SleepConditionVariableSRW(
&coercedCondvar(waiter.condvar), &coercedSrwLock, INFINITE, 0));
DWORD millis = endTime.map([](TimePoint end) {
TimePoint now = kj::origin<TimePoint>() + GetTickCount64() * kj::MILLISECONDS;
return kj::max(end - now, 0 * kj::MILLISECONDS) / kj::MILLISECONDS;
}).orDefault(INFINITE);
if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, millis, 0)) {
// Normal result. Continue loop to check predicate.
} else {
DWORD error = GetLastError();
if (error == ERROR_TIMEOUT) {
// Timed out. Skip predicate check.
return;
} else {
KJ_FAIL_WIN32("SleepConditionVariableSRW()", error);
}
}
}
}
......@@ -494,7 +609,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate) {
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Add waiter to list.
......@@ -508,8 +623,25 @@ void Mutex::lockWhen(Predicate& predicate) {
removeWaiter(waiter);
// Destroy pthread objects.
KJ_PTHREAD_CALL(pthread_mutex_destroy(&waiter.stupidMutex));
KJ_PTHREAD_CALL(pthread_cond_destroy(&waiter.condvar));
KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
KJ_PTHREAD_CLEANUP(pthread_cond_destroy(&waiter.condvar));
});
#if !__APPLE__
if (timeout != nullptr) {
// Oops, the default condvar uses the wall clock, which is dumb... fix it to use the monotonic
// clock. (Except not on macOS, where pthread_condattr_setclock() is unimplemented, but there's
// a bizarre pthread_cond_timedwait_relative_np() method we can use instead...)
pthread_condattr_t attr;
KJ_PTHREAD_CALL(pthread_condattr_init(&attr));
KJ_PTHREAD_CALL(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
pthread_cond_init(&waiter.condvar, &attr);
KJ_PTHREAD_CALL(pthread_condattr_destroy(&attr));
}
#endif
Maybe<struct timespec> endTime = timeout.map([](Duration d) {
return toAbsoluteTimespec(now() + d);
});
while (!predicate.check()) {
......@@ -522,8 +654,31 @@ void Mutex::lockWhen(Predicate& predicate) {
// OK, now we can unlock the main mutex.
unlock(EXCLUSIVE);
bool timedOut = false;
// Wait for someone to signal the condvar.
KJ_IF_MAYBE(t, endTime) {
#if __APPLE__
// On macOS, the absolute timeout can only be specified in wall time, not monotonic time,
// which means modifying the system clock will break the wait. However, macOS happens to
// provide an alternative relative-time wait function, so I guess we'll use that. It does
// require recomputing the time every iteration...
struct timespec ts = toRelativeTimespec(
kj::max(toAbsoluteTimespec(*t) - now(), 0 * kj::SECONDS));
int error = pthread_cond_timedwait_relative_np(&waiter.condvar, &waiter.stupidMutex, &ts);
#else
int error = pthread_cond_timedwait(&waiter.condvar, &waiter.stupidMutex, t);
#endif
if (error != 0) {
if (error == ETIMEDOUT) {
timedOut = true;
} else {
KJ_FAIL_SYSCALL("pthread_cond_timedwait", error);
}
}
} else {
KJ_PTHREAD_CALL(pthread_cond_wait(&waiter.condvar, &waiter.stupidMutex));
}
// We have to be very careful about lock ordering here. We need to unlock stupidMutex before
// re-locking the main mutex, because another thread may have a lock on the main mutex already
......@@ -533,6 +688,10 @@ void Mutex::lockWhen(Predicate& predicate) {
KJ_PTHREAD_CALL(pthread_mutex_unlock(&waiter.stupidMutex));
lock(EXCLUSIVE);
if (timedOut) {
return;
}
}
}
......
......@@ -27,6 +27,7 @@
#include "memory.h"
#include <inttypes.h>
#include "time.h"
#if __linux__ && !defined(KJ_USE_FUTEX)
#define KJ_USE_FUTEX 1
......@@ -71,8 +72,10 @@ public:
virtual bool check() = 0;
};
void lockWhen(Predicate& predicate);
// Lock (exclusively) when predicate.check() returns true.
void lockWhen(Predicate& predicate, Maybe<Duration> timeout = nullptr);
// Lock (exclusively) when predicate.check() returns true, or when the timeout (if any) expires.
// The mutex is always locked when this returns regardless of whether the timeout expired (and
// always unlocked if it throws).
private:
#if KJ_USE_FUTEX
......@@ -100,6 +103,7 @@ private:
Predicate& predicate;
#if KJ_USE_FUTEX
uint futex;
bool hasTimeout;
#elif _WIN32
uintptr_t condvar;
// Actually CONDITION_VARIABLE, but don't want to #include <windows.h> in header.
......@@ -284,7 +288,8 @@ public:
// Like `getWithoutLock()`, but asserts that the lock is already held by the calling thread.
template <typename Cond, typename Func>
auto when(Cond&& condition, Func&& callback) const -> decltype(callback(instance<T&>())) {
auto when(Cond&& condition, Func&& callback, Maybe<Duration> timeout = nullptr) const
-> decltype(callback(instance<T&>())) {
// Waits until condition(state) returns true, then calls callback(state) under lock.
//
// `condition`, when called, receives as its parameter a const reference to the state, which is
......@@ -295,6 +300,10 @@ public:
// condition to become true. It may even return true once, but then be called more times.
// It is guaranteed, though, that at the time `callback()` is finally called, `condition()`
// would currently return true (assuming it is a pure function of the guarded data).
//
// If `timeout` is specified, then after the given amount of time, the callback will be called
// regardless of whether the condition is true. In this case, when `callback()` is called,
// `condition()` may in fact evaluate false, but *only* if the timeout was reached.
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
......@@ -309,7 +318,7 @@ public:
};
PredicateImpl impl(kj::fwd<Cond>(condition), value);
mutex.lockWhen(impl);
mutex.lockWhen(impl, timeout);
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
return callback(value);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment