Unverified Commit 5b93ce92 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #857 from capnproto/cross-thread-events

 Extend KJ event loop to support cross-thread events.
parents f81a83ac 10c18fef
......@@ -2,7 +2,11 @@
ACLOCAL_AMFLAGS = -I m4
AUTOMAKE_OPTIONS = foreign subdir-objects
# We use serial-tests so that test output will be written directly to stdout
# which is much preferred in CI environments where the test logs may be hard
# to get at after the fact. Most of our tests are bundled into a single
# executable anyway so cannot easily be parallelized.
AUTOMAKE_OPTIONS = foreign subdir-objects serial-tests
# When running distcheck, verify that we've included all the files needed by
# the cmake build.
......@@ -460,8 +464,11 @@ else !LITE_MODE
check_PROGRAMS = capnp-test capnp-evolution-test capnp-afl-testcase
heavy_tests = \
src/kj/async-test.c++ \
src/kj/async-xthread-test.c++ \
src/kj/async-unix-test.c++ \
src/kj/async-unix-xthread-test.c++ \
src/kj/async-win32-test.c++ \
src/kj/async-win32-xthread-test.c++ \
src/kj/async-io-test.c++ \
src/kj/parse/common-test.c++ \
src/kj/parse/char-test.c++ \
......
......@@ -215,8 +215,11 @@ if(BUILD_TESTING)
if(NOT CAPNP_LITE)
add_executable(kj-heavy-tests
async-test.c++
async-xthread-test.c++
async-unix-test.c++
async-unix-xthread-test.c++
async-win32-test.c++
async-win32-xthread-test.c++
async-io-test.c++
refcount-test.c++
string-tree-test.c++
......
......@@ -78,12 +78,43 @@ public:
Maybe<T> value;
};
template <typename T>
inline T convertToReturn(ExceptionOr<T>&& result) {
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
inline void convertToReturn(ExceptionOr<Void>&& result) {
// Override <void> case to use throwRecoverableException().
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this
// internally.
public:
Event();
Event(kj::EventLoop& loop);
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
......@@ -105,6 +136,10 @@ public:
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
void disarm();
// If the event is armed but hasn't fired, cancel it. (Destroying the event does this
// implicitly.)
kj::String trace();
// Dump debug info about this event.
......@@ -165,6 +200,7 @@ protected:
void init(Event* newEvent);
void arm();
void armBreadthFirst();
// Arms the event if init() has already been called and makes future calls to init()
// automatically arm the event.
......@@ -877,40 +913,8 @@ Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) {
template <typename T>
T Promise<T>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::FixVoid<T>> result;
_::waitImpl(kj::mv(node), result, waitScope);
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
template <>
inline void Promise<void>::wait(WaitScope& waitScope) {
// Override <void> case to use throwRecoverableException().
_::ExceptionOr<_::Void> result;
_::waitImpl(kj::mv(node), result, waitScope);
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
return convertToReturn(kj::mv(result));
}
template <typename T>
......@@ -1139,4 +1143,157 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() {
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
}
// =======================================================================================
// cross-thread stuff
namespace _ { // (private)
class XThreadEvent: private Event, // it's an event in the target thread
public PromiseNode { // it's a PromiseNode in the requesting thread
public:
XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor)
: Event(targetExecutor.loop), result(result), targetExecutor(targetExecutor) {}
protected:
void ensureDoneOrCanceled();
// MUST be called in destructor of subclasses to make sure the object is not destroyed while
// still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
// that destructor doesn't run until the subclass has already been destroyed.)
virtual kj::Maybe<Own<PromiseNode>> execute() = 0;
// Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
// returns null.
template <typename T>
Own<PromiseNode> extractNode(Promise<T> promise) { return kj::mv(promise.node); }
// implements PromiseNode ----------------------------------------------------
void onReady(Event* event) noexcept override;
private:
ExceptionOrValue& result;
const Executor& targetExecutor;
Maybe<const Executor&> replyExecutor; // If executeAsync() was used.
kj::Maybe<Own<PromiseNode>> promiseNode;
// Accessed only in target thread.
Maybe<XThreadEvent&> targetNext;
Maybe<XThreadEvent&>* targetPrev = nullptr;
// Membership in one of the linked lists in the target Executor's work list or cancel list. These
// fields are protected by the target Executor's mutex.
enum {
UNUSED,
// Object was never queued on another thread.
QUEUED,
// Target thread has not yet dequeued the event from crossThreadRequests.events. The requesting
// thread can cancel execution by removing the event from the list.
EXECUTING,
// Target thread has dequeued the event and is executing it. To cancel, the requesting thread
// must add the event to the crossThreadRequests.cancel list.
DONE
// Target thread has completed handling this event and will not touch it again. The requesting
// thread can safely delete the object. The `state` is updated to `DONE` using an atomic
// release operation after ensuring that the event will not be touched again, so that the
// requesting can safely skip locking if it observes the state is already DONE.
} state = UNUSED;
// State, which is also protected by `targetExecutor`'s mutex.
Maybe<XThreadEvent&> replyNext;
Maybe<XThreadEvent&>* replyPrev = nullptr;
// Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
// executing thread places the event in the reply list near the end of the `EXECUTING` state.
// Because the thread cannot lock two mutexes at once, it's possible that the reply executor
// will receive the reply while the event is still listed in the EXECUTING state, but it can
// ignore the state and proceed with the result.
OnReadyEvent onReadyEvent;
// Accessed only in requesting thread.
friend class kj::Executor;
void done();
class DelayedDoneHack;
// implements Event ----------------------------------------------------------
Maybe<Own<Event>> fire() override;
// If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
// then it just indicated readiness and we need to get its result.
};
template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
class XThreadEventImpl final: public XThreadEvent {
// Implementation for a function that does not return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target)
: XThreadEvent(result, target), func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
return nullptr;
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
template <typename Func, typename T>
class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
// Implementation for a function that DOES return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target)
: XThreadEvent(result, target), func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
auto result = extractNode(func());
KJ_IREQUIRE(result.get() != nullptr);
return kj::mv(result);
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
} // namespace _ (private)
template <typename Func>
_::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(Func&& func) const {
_::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this);
send(event, true);
return convertToReturn(kj::mv(event.result));
}
template <typename Func>
PromiseForResult<Func, void> Executor::executeAsync(Func&& func) const {
auto event = kj::heap<_::XThreadEventImpl<Func>>(kj::fwd<Func>(func), *this);
send(*event, false);
return PromiseForResult<Func, void>(false, kj::mv(event));
}
} // namespace kj
......@@ -68,6 +68,12 @@ using ReducePromises = decltype(reducePromiseType((T*)nullptr, false));
// reduces Promise<T> to something else. In particular this allows Promise<capnp::RemotePromise<U>>
// to reduce to capnp::RemotePromise<U>.
template <typename T> struct UnwrapPromise_;
template <typename T> struct UnwrapPromise_<Promise<T>> { typedef T Type; };
template <typename T>
using UnwrapPromise = typename UnwrapPromise_<T>::Type;
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
// arbitrary type which simply propagates the exception.
......@@ -186,6 +192,7 @@ template <typename T>
class ForkHub;
class Event;
class XThreadEvent;
class PromiseBase {
public:
......@@ -206,6 +213,7 @@ private:
template <typename U>
friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises);
friend Promise<void> kj::joinPromises(Array<Promise<void>>&& promises);
friend class XThreadEvent;
};
void detach(kj::Promise<void>&& promise);
......
......@@ -37,6 +37,7 @@
#include <sys/wait.h>
#include <sys/time.h>
#include <errno.h>
#include "mutex.h"
namespace kj {
namespace {
......@@ -678,14 +679,48 @@ TEST(AsyncUnixTest, Wake) {
EXPECT_FALSE(port.wait());
}
bool woken = false;
Thread thread([&]() {
// Test wake() when already wait()ing.
{
Thread thread([&]() {
delay();
port.wake();
});
EXPECT_TRUE(port.wait());
}
// Test wait() after wake() already happened.
{
Thread thread([&]() {
port.wake();
});
delay();
woken = true;
port.wake();
});
EXPECT_TRUE(port.wait());
}
EXPECT_TRUE(port.wait());
// Test wake() during poll() busy loop.
{
Thread thread([&]() {
delay();
port.wake();
});
EXPECT_FALSE(port.poll());
while (!port.poll()) {}
}
// Test poll() when wake() already delivered.
{
EXPECT_FALSE(port.poll());
Thread thread([&]() {
port.wake();
});
delay();
EXPECT_TRUE(port.poll());
}
}
int exitCodeForSignal = 0;
......@@ -707,7 +742,7 @@ struct TestChild {
sigset_t sigs;
sigemptyset(&sigs);
sigaddset(&sigs, SIGTERM);
sigprocmask(SIG_UNBLOCK, &sigs, nullptr);
pthread_sigmask(SIG_UNBLOCK, &sigs, nullptr);
for (;;) pause();
}
......@@ -740,8 +775,8 @@ TEST(AsyncUnixTest, ChildProcess) {
sigset_t sigs, oldsigs;
KJ_SYSCALL(sigemptyset(&sigs));
KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
KJ_SYSCALL(sigprocmask(SIG_BLOCK, &sigs, &oldsigs));
KJ_DEFER(KJ_SYSCALL(sigprocmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &sigs, &oldsigs));
KJ_DEFER(KJ_SYSCALL(pthread_sigmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
TestChild child1(port, 123);
KJ_EXPECT(!child1.promise.poll(waitScope));
......@@ -774,6 +809,75 @@ TEST(AsyncUnixTest, ChildProcess) {
// child3 will be killed and synchronously waited on the way out.
}
#if !__CYGWIN__
// TODO(someday): Figure out why whenWriteDisconnected() never resolves on Cygwin.
KJ_TEST("UnixEventPort whenWriteDisconnected()") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int fds_[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_));
kj::AutoCloseFd fds[2] = { kj::AutoCloseFd(fds_[0]), kj::AutoCloseFd(fds_[1]) };
UnixEventPort::FdObserver observer(port, fds[0], UnixEventPort::FdObserver::OBSERVE_READ);
// At one point, the poll()-based version of UnixEventPort had a bug where if some other event
// had completed previously, whenWriteDisconnected() would stop being watched for. So we watch
// for readability as well and check that that goes away first.
auto readablePromise = observer.whenBecomesReadable();
auto hupPromise = observer.whenWriteDisconnected();
KJ_EXPECT(!readablePromise.poll(waitScope));
KJ_EXPECT(!hupPromise.poll(waitScope));
KJ_SYSCALL(write(fds[1], "foo", 3));
KJ_ASSERT(readablePromise.poll(waitScope));
readablePromise.wait(waitScope);
{
char junk[16];
ssize_t n;
KJ_SYSCALL(n = read(fds[0], junk, 16));
KJ_EXPECT(n == 3);
}
KJ_EXPECT(!hupPromise.poll(waitScope));
fds[1] = nullptr;
KJ_ASSERT(hupPromise.poll(waitScope));
hupPromise.wait(waitScope);
}
#endif
KJ_TEST("UnixEventPort poll for signals") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto promise1 = port.onSignal(SIGURG);
auto promise2 = port.onSignal(SIGIO);
KJ_EXPECT(!promise1.poll(waitScope));
KJ_EXPECT(!promise2.poll(waitScope));
KJ_SYSCALL(raise(SIGURG));
KJ_SYSCALL(raise(SIGIO));
port.wake();
KJ_EXPECT(port.poll());
KJ_EXPECT(promise1.poll(waitScope));
KJ_EXPECT(promise2.poll(waitScope));
promise1.wait(waitScope);
promise2.wait(waitScope);
}
} // namespace
} // namespace kj
......
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if !_WIN32
#include "async-unix.h"
#define KJ_XTHREAD_TEST_SETUP_LOOP \
UnixEventPort port; \
EventLoop loop(port); \
WaitScope waitScope(loop)
#include "async-xthread-test.c++"
#endif // !_WIN32
......@@ -31,14 +31,15 @@
#include <pthread.h>
#include <map>
#include <sys/wait.h>
#include <unistd.h>
#if KJ_USE_EPOLL
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/eventfd.h>
#else
#include <poll.h>
#include <fcntl.h>
#endif
namespace kj {
......@@ -56,6 +57,19 @@ bool threadClaimedChildExits = false;
struct SignalCapture {
sigjmp_buf jumpTo;
siginfo_t siginfo;
sigset_t originalMask;
// The signal mask to be restored when jumping out of the signal handler.
//
// "But wait!" you say, "Isn't the whole point of siglongjmp() that it does this for you?" Well,
// yes, that is supposed to be the point. However, Apple implemented in wrong. On macOS,
// siglongjmp() uses sigprocmask() -- not pthread_sigmask() -- to restore the signal mask.
// Unfortunately, sigprocmask() on macOS affects threads other than the current thread. Arguably
// this is conformant: sigprocmask() is documented as having unspecified behavior in the presence
// of threads, and pthread_sigmask() must be used instead. However, this means siglongjmp()
// cannot be used in the presence of threads.
//
// We'll just have to restore the signal mask ourselves, rather than rely on siglongjmp()...
};
#if !KJ_USE_EPOLL // on Linux we'll use signalfd
......@@ -65,7 +79,14 @@ void signalHandler(int, siginfo_t* siginfo, void*) {
SignalCapture* capture = threadCapture;
if (capture != nullptr) {
capture->siginfo = *siginfo;
siglongjmp(capture->jumpTo, 1);
// See comments on SignalCapture::originalMask, above: We can't rely on siglongjmp() to restore
// the signal mask; we must do it ourselves using pthread_sigmask(). We pass false as the
// second parameter to siglongjmp() so that it skips changing the signal mask. This makes it
// equivalent to `longjmp()` on Linux or `_longjmp()` on BSD/macOS. See comments on
// SignalCapture::originalMask for explanation.
pthread_sigmask(SIG_SETMASK, &capture->originalMask, nullptr);
siglongjmp(capture->jumpTo, false);
}
}
#endif
......@@ -76,7 +97,7 @@ void registerSignalHandler(int signum) {
sigset_t mask;
KJ_SYSCALL(sigemptyset(&mask));
KJ_SYSCALL(sigaddset(&mask, signum));
KJ_SYSCALL(sigprocmask(SIG_BLOCK, &mask, nullptr));
KJ_SYSCALL(pthread_sigmask(SIG_BLOCK, &mask, nullptr));
#if !KJ_USE_EPOLL // on Linux we'll use signalfd
struct sigaction action;
......@@ -88,10 +109,14 @@ void registerSignalHandler(int signum) {
#endif
}
#if !KJ_USE_EPOLL && !KJ_USE_PIPE_FOR_WAKEUP
void registerReservedSignal() {
registerSignalHandler(reservedSignal);
}
#endif
// We also disable SIGPIPE because users of UnixEventPort almost certainly don't want it.
void ignoreSigpipe() {
// We disable SIGPIPE because users of UnixEventPort almost certainly don't want it.
while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
int error = errno;
if (error != EINTR) {
......@@ -100,8 +125,6 @@ void registerReservedSignal() {
}
}
pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT;
} // namespace
struct UnixEventPort::ChildSet {
......@@ -280,7 +303,7 @@ UnixEventPort::UnixEventPort()
epollFd(-1),
signalFd(-1),
eventFd(-1) {
pthread_once(&registerReservedSignalOnce, &registerReservedSignal);
ignoreSigpipe();
int fd;
KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC));
......@@ -610,11 +633,31 @@ bool UnixEventPort::doEpollWait(int timeout) {
UnixEventPort::UnixEventPort()
: clock(systemPreciseMonotonicClock()),
timerImpl(clock.now()) {
#if KJ_USE_PIPE_FOR_WAKEUP
// Allocate a pipe to which we'll write a byte in order to wake this thread.
int fds[2];
KJ_SYSCALL(pipe(fds));
wakePipeIn = kj::AutoCloseFd(fds[0]);
wakePipeOut = kj::AutoCloseFd(fds[1]);
KJ_SYSCALL(fcntl(wakePipeIn, F_SETFD, FD_CLOEXEC));
KJ_SYSCALL(fcntl(wakePipeOut, F_SETFD, FD_CLOEXEC));
#else
static_assert(sizeof(threadId) >= sizeof(pthread_t),
"pthread_t is larger than a long long on your platform. Please port.");
*reinterpret_cast<pthread_t*>(&threadId) = pthread_self();
pthread_once(&registerReservedSignalOnce, &registerReservedSignal);
// Note: We used to use a pthread_once to call registerReservedSignal() only once per process.
// This didn't work correctly because registerReservedSignal() not only registers the
// (process-wide) signal handler, but also sets the (per-thread) signal mask to block the
// signal. Thus, if threads were spawned before the first UnixEventPort was created, and then
// multiple threads created UnixEventPorts, only one of them would have the signal properly
// blocked. We could have changed things so that only the handler registration was protected
// by the pthread_once and the mask update happened in every thread, but registering a signal
// handler is not an expensive operation, so whatever... we'll do it in every thread.
registerReservedSignal();
#endif
ignoreSigpipe();
}
UnixEventPort::~UnixEventPort() noexcept(false) {}
......@@ -672,7 +715,8 @@ void UnixEventPort::FdObserver::fire(short events) {
}
}
if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr) {
if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr &&
hupFulfiller == nullptr) {
// Remove from list.
if (next == nullptr) {
eventPort.observersTail = prev;
......@@ -761,16 +805,25 @@ Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() {
class UnixEventPort::PollContext {
public:
PollContext(FdObserver* ptr) {
while (ptr != nullptr) {
PollContext(UnixEventPort& port) {
for (FdObserver* ptr = port.observersHead; ptr != nullptr; ptr = ptr->next) {
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = ptr->fd;
pollfd.events = ptr->getEventMask();
pollfds.add(pollfd);
pollEvents.add(ptr);
ptr = ptr->next;
}
#if KJ_USE_PIPE_FOR_WAKEUP
{
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = port.wakePipeIn;
pollfd.events = POLLIN;
pollfds.add(pollfd);
}
#endif
}
void run(int timeout) {
......@@ -786,19 +839,36 @@ public:
}
}
void processResults() {
bool processResults() {
if (pollResult < 0) {
KJ_FAIL_SYSCALL("poll()", pollError);
}
bool woken = false;
for (auto i: indices(pollfds)) {
if (pollfds[i].revents != 0) {
pollEvents[i]->fire(pollfds[i].revents);
#if KJ_USE_PIPE_FOR_WAKEUP
if (i == pollEvents.size()) {
// The last pollfd is our cross-thread wake pipe.
woken = true;
// Discard junk in the wake pipe.
char junk[256];
ssize_t n;
do {
KJ_NONBLOCKING_SYSCALL(n = read(pollfds[i].fd, junk, sizeof(junk)));
} while (n >= 256);
} else {
#endif
pollEvents[i]->fire(pollfds[i].revents);
#if KJ_USE_PIPE_FOR_WAKEUP
}
#endif
if (--pollResult <= 0) {
break;
}
}
}
return woken;
}
private:
......@@ -811,7 +881,10 @@ private:
bool UnixEventPort::wait() {
sigset_t newMask;
sigemptyset(&newMask);
#if !KJ_USE_PIPE_FOR_WAKEUP
sigaddset(&newMask, reservedSignal);
#endif
{
auto ptr = signalHead;
......@@ -824,41 +897,44 @@ bool UnixEventPort::wait() {
}
}
PollContext pollContext(observersHead);
PollContext pollContext(*this);
// Capture signals.
SignalCapture capture;
if (sigsetjmp(capture.jumpTo, true)) {
if (sigsetjmp(capture.jumpTo, false)) {
// We received a signal and longjmp'd back out of the signal handler.
threadCapture = nullptr;
#if !KJ_USE_PIPE_FOR_WAKEUP
if (capture.siginfo.si_signo == reservedSignal) {
return true;
} else {
#endif
gotSignal(capture.siginfo);
return false;
#if !KJ_USE_PIPE_FOR_WAKEUP
}
#endif
}
// Enable signals, run the poll, then mask them again.
sigset_t origMask;
threadCapture = &capture;
sigprocmask(SIG_UNBLOCK, &newMask, &origMask);
pthread_sigmask(SIG_UNBLOCK, &newMask, &capture.originalMask);
pollContext.run(
timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, int(maxValue))
.map([](uint64_t t) -> int { return t; })
.orDefault(-1));
sigprocmask(SIG_SETMASK, &origMask, nullptr);
pthread_sigmask(SIG_SETMASK, &capture.originalMask, nullptr);
threadCapture = nullptr;
// Queue events.
pollContext.processResults();
bool result = pollContext.processResults();
timerImpl.advanceTo(clock.now());
return false;
return result;
}
bool UnixEventPort::poll() {
......@@ -874,11 +950,13 @@ bool UnixEventPort::poll() {
KJ_SYSCALL(sigpending(&pending));
uint signalCount = 0;
#if !KJ_USE_PIPE_FOR_WAKEUP
if (sigismember(&pending, reservedSignal)) {
++signalCount;
sigdelset(&pending, reservedSignal);
sigdelset(&waitMask, reservedSignal);
}
#endif
{
auto ptr = signalHead;
......@@ -894,29 +972,53 @@ bool UnixEventPort::poll() {
// Wait for each pending signal. It would be nice to use sigtimedwait() here but it is not
// available on OSX. :( Instead, we call sigsuspend() once per expected signal.
while (signalCount-- > 0) {
{
SignalCapture capture;
pthread_sigmask(SIG_SETMASK, nullptr, &capture.originalMask);
threadCapture = &capture;
if (sigsetjmp(capture.jumpTo, true)) {
// We received a signal and longjmp'd back out of the signal handler.
sigdelset(&waitMask, capture.siginfo.si_signo);
if (capture.siginfo.si_signo == reservedSignal) {
woken = true;
KJ_DEFER(threadCapture = nullptr);
while (signalCount-- > 0) {
if (sigsetjmp(capture.jumpTo, false)) {
// We received a signal and longjmp'd back out of the signal handler.
sigdelset(&waitMask, capture.siginfo.si_signo);
#if !KJ_USE_PIPE_FOR_WAKEUP
if (capture.siginfo.si_signo == reservedSignal) {
woken = true;
} else {
#endif
gotSignal(capture.siginfo);
#if !KJ_USE_PIPE_FOR_WAKEUP
}
#endif
} else {
gotSignal(capture.siginfo);
#if __CYGWIN__
// Cygwin's sigpending() incorrectly reports signals pending for any thread, not just our
// own thread. As a work-around, instead of using sigsuspend() (which would block forever
// if the signal is not pending on *this* thread), we un-mask the signals and immediately
// mask them again. If any signals are pending, they *should* be delivered before the first
// sigprocmask() returns, and the handler will then longjmp() to the block above. If it
// turns out no signal is pending, we'll block the signals again and break out of the
// loop.
//
// Bug reported here: https://cygwin.com/ml/cygwin/2019-07/msg00051.html
sigprocmask(SIG_SETMASK, &waitMask, nullptr);
sigprocmask(SIG_SETMASK, &capture.originalMask, nullptr);
break;
#else
sigsuspend(&waitMask);
KJ_FAIL_ASSERT("sigsuspend() shouldn't return because the signal handler should "
"have siglongjmp()ed.");
#endif
}
} else {
sigsuspend(&waitMask);
KJ_FAIL_ASSERT("sigsuspend() shouldn't return because the signal handler should "
"have siglongjmp()ed.");
}
threadCapture = nullptr;
}
{
PollContext pollContext(observersHead);
PollContext pollContext(*this);
pollContext.run(0);
pollContext.processResults();
if (pollContext.processResults()) {
woken = true;
}
}
timerImpl.advanceTo(clock.now());
......@@ -924,10 +1026,20 @@ bool UnixEventPort::poll() {
}
void UnixEventPort::wake() const {
#if KJ_USE_PIPE_FOR_WAKEUP
// We're going to write() a single byte to our wake pipe in order to cause poll() to complete in
// the target thread.
//
// If this write() fails with EWOULDBLOCK, we don't care, because the target thread is already
// scheduled to wake up.
char c = 0;
KJ_NONBLOCKING_SYSCALL(write(wakePipeOut, &c, 1));
#else
int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal);
if (error != 0) {
KJ_FAIL_SYSCALL("pthread_kill", error);
}
#endif
}
#endif // KJ_USE_EPOLL, else
......
......@@ -40,6 +40,14 @@
#define KJ_USE_EPOLL 1
#endif
#if __CYGWIN__ && !defined(KJ_USE_PIPE_FOR_WAKEUP)
// Cygwin has serious issues with the intersection of signals and threads, reported here:
// https://cygwin.com/ml/cygwin/2019-07/msg00052.html
// On Cygwin, therefore, we do not use signals to wake threads. Instead, each thread allocates a
// pipe, and we write a byte to the pipe to wake the thread... ick.
#define KJ_USE_PIPE_FOR_WAKEUP 1
#endif
namespace kj {
class UnixEventPort: public EventPort {
......@@ -169,7 +177,12 @@ private:
FdObserver* observersHead = nullptr;
FdObserver** observersTail = &observersHead;
#if KJ_USE_PIPE_FOR_WAKEUP
AutoCloseFd wakePipeIn;
AutoCloseFd wakePipeOut;
#else
unsigned long long threadId; // actually pthread_t
#endif
#endif
struct ChildSet;
......
......@@ -24,6 +24,7 @@
#include "async-win32.h"
#include "thread.h"
#include "test.h"
#include "mutex.h"
namespace kj {
namespace {
......
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#include "async-win32.h"
#define KJ_XTHREAD_TEST_SETUP_LOOP \
Win32IocpEventPort port; \
EventLoop loop(port); \
WaitScope waitScope(loop)
#include "async-xthread-test.c++"
#endif // _WIN32
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "async.h"
#include "debug.h"
#include "thread.h"
#include "mutex.h"
#include <kj/test.h>
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#include <windows.h>
#include "windows-sanity.h"
inline void delay() { Sleep(10); }
#else
#include <unistd.h>
inline void delay() { usleep(10000); }
#endif
// This file is #included from async-unix-xthread-test.c++ and async-win32-xthread-test.c++ after
// defining KJ_XTHREAD_TEST_SETUP_LOOP to set up a loop with the corresponding EventPort.
#ifndef KJ_XTHREAD_TEST_SETUP_LOOP
#define KJ_XTHREAD_TEST_SETUP_LOOP \
EventLoop loop; \
WaitScope waitScope(loop)
#endif
namespace kj {
namespace {
KJ_TEST("synchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}).wait(waitScope));
Promise<uint> promise = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456u;
});
KJ_EXPECT(promise.wait(waitScope) == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("synchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(i == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}).wait(waitScope));
Promise<uint> promise2 = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(promise2.wait(waitScope) == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event before it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
*executor.lockExclusive() = getCurrentThreadExecutor();
// We never run the loop here, so that when the event is canceled, it's still queued.
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
volatile bool called = false;
{
Promise<uint> promise = exec->executeAsync([&]() { called = true; return 123u; });
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
KJ_EXPECT(!called);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event while it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
volatile bool called = false;
Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> {
called = true;
return kj::NEVER_DONE;
});
while (!called) {
delay();
}
KJ_EXPECT(!promise.poll(waitScope));
}
exec->executeSync([&]() { fulfiller->fulfill(); });
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cross-thread cancellation in both directions at once") {
MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
MutexGuarded<uint> readyCount(0);
thread_local uint threadNumber = 0;
thread_local bool receivedFinalCall = false;
// Code to execute simultaneously in two threads...
// We mark this noexcept so that any exceptions thrown will immediately invoke the termination
// handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor,
uint threadCount) noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
*selfExecutor.lockExclusive() = getCurrentThreadExecutor();
const Executor* exec;
{
auto lock = otherExecutor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
// Create a ton of cross-thread promises to cancel.
Vector<Promise<void>> promises;
for (uint i = 0; i < 1000; i++) {
promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
return kj::Promise<void>(kj::NEVER_DONE)
.attach(kj::defer([wasThreadNumber = threadNumber]() {
// Make sure destruction happens in the correct thread.
KJ_ASSERT(threadNumber == wasThreadNumber);
}));
}));
}
// Signal other thread that we're done queueing, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([&](uint i) { return i >= threadCount; });
}
// Run event loop to start all executions queued by the other thread.
waitScope.poll();
loop.run();
// Signal other thread that we've run the loop, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([&](uint i) { return i >= threadCount * 2; });
}
// Cancel all the promises.
promises.clear();
// All our cancellations completed, but the other thread may still be waiting for some
// cancellations from us. We need to pump our event loop to make sure we continue handling
// those cancellation requests. In particular we'll queue a function to the other thread and
// wait for it to complete. The other thread will queue its own function to this thread just
// before completing the function we queued to it.
receivedFinalCall = false;
exec->executeAsync([&]() { receivedFinalCall = true; }).wait(waitScope);
// To be safe, make sure we've actually executed the function that the other thread queued to
// us by repeatedly polling until `receivedFinalCall` becomes true in this thread.
while (!receivedFinalCall) {
waitScope.poll();
loop.run();
}
// OK, signal other that we're all done.
*otherExecutor.lockExclusive() = nullptr;
// Wait until other thread sets executor to null, as a way to tell us to quit.
selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
{
Thread thread([&]() {
threadNumber = 1;
simultaneous(childExecutor, parentExecutor, 2);
});
threadNumber = 0;
simultaneous(parentExecutor, childExecutor, 2);
}
// Let's even have a three-thread version, with cyclic cancellation requests.
MutexGuarded<kj::Maybe<const Executor&>> child2Executor;
*readyCount.lockExclusive() = 0;
{
Thread thread1([&]() {
threadNumber = 1;
simultaneous(childExecutor, child2Executor, 3);
});
Thread thread2([&]() {
threadNumber = 2;
simultaneous(child2Executor, parentExecutor, 3);
});
threadNumber = 0;
simultaneous(parentExecutor, childExecutor, 3);
}
}
KJ_TEST("cross-thread cancellation cycle") {
// Another multi-way cancellation test where we set up an actual cycle between three threads
// waiting on each other to complete a single event.
MutexGuarded<kj::Maybe<const Executor&>> child1Executor, child2Executor;
Own<PromiseFulfiller<void>> fulfiller1, fulfiller2;
auto threadMain = [](MutexGuarded<kj::Maybe<const Executor&>>& executor,
Own<PromiseFulfiller<void>>& fulfiller) noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
Thread thread1([&]() noexcept { threadMain(child1Executor, fulfiller1); });
Thread thread2([&]() noexcept { threadMain(child2Executor, fulfiller2); });
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto& parentExecutor = getCurrentThreadExecutor();
const Executor* exec1;
{
auto lock = child1Executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec1 = &KJ_ASSERT_NONNULL(*lock);
}
const Executor* exec2;
{
auto lock = child2Executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec2 = &KJ_ASSERT_NONNULL(*lock);
}
// Create an event that cycles through both threads and back to this one, and then cancel it.
bool cycleAllDestroyed = false;
{
auto paf = kj::newPromiseAndFulfiller<void>();
Promise<uint> promise = exec1->executeAsync([&]() -> kj::Promise<uint> {
return exec2->executeAsync([&]() -> kj::Promise<uint> {
return parentExecutor.executeAsync([&]() -> kj::Promise<uint> {
paf.fulfiller->fulfill();
return kj::Promise<uint>(kj::NEVER_DONE).attach(kj::defer([&]() {
cycleAllDestroyed = true;
}));
});
});
});
// Wait until the cycle has come all the way around.
paf.promise.wait(waitScope);
KJ_EXPECT(!promise.poll(waitScope));
}
KJ_EXPECT(cycleAllDestroyed);
exec1->executeSync([&]() { fulfiller1->fulfill(); });
exec2->executeSync([&]() { fulfiller2->fulfill(); });
*child1Executor.lockExclusive() = nullptr;
*child2Executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("call own thread's executor") {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto& executor = getCurrentThreadExecutor();
{
uint i = executor.executeSync([]() {
return 123u;
});
KJ_EXPECT(i == 123);
}
KJ_EXPECT_THROW_MESSAGE(
"can't call executeSync() on own thread's executor with a promise-returning function",
executor.executeSync([]() { return kj::evalLater([]() {}); }));
{
uint i = executor.executeAsync([]() {
return 123u;
}).wait(waitScope);
KJ_EXPECT(i == 123);
}
}
} // namespace
} // namespace kj
......@@ -19,15 +19,21 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#endif
#include "async.h"
#include "debug.h"
#include "vector.h"
#include "threadlocal.h"
#include "mutex.h"
#if KJ_USE_FUTEX
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#if _WIN32
#include <windows.h> // just for Sleep(0)
#include "windows-sanity.h"
#else
#include <sched.h> // just for sched_yield()
#endif
#if !KJ_NO_RTTI
......@@ -268,26 +274,412 @@ public:
LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();
class NullEventPort: public EventPort {
public:
bool wait() override {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
} // namespace _ (private)
// =======================================================================================
struct Executor::Impl {
typedef Maybe<_::XThreadEvent&> _::XThreadEvent::*NextMember;
typedef Maybe<_::XThreadEvent&>* _::XThreadEvent::*PrevMember;
template <NextMember next, PrevMember prev>
struct List {
kj::Maybe<_::XThreadEvent&> head;
kj::Maybe<_::XThreadEvent&>* tail = &head;
bool empty() const {
return head == nullptr;
}
void insert(_::XThreadEvent& event) {
KJ_REQUIRE(event.*prev == nullptr);
*tail = event;
event.*prev = tail;
tail = &(event.*next);
}
void erase(_::XThreadEvent& event) {
KJ_REQUIRE(event.*prev != nullptr);
*(event.*prev) = event.*next;
KJ_IF_MAYBE(n, event.*next) {
n->*prev = event.*prev;
} else {
KJ_DASSERT(tail == &(event.*next));
tail = event.*prev;
}
event.*next = nullptr;
event.*prev = nullptr;
}
template <typename Func>
void forEach(Func&& func) {
kj::Maybe<_::XThreadEvent&> current = head;
for (;;) {
KJ_IF_MAYBE(c, current) {
auto nextItem = c->*next;
func(*c);
current = nextItem;
} else {
break;
}
}
}
};
struct State {
// Queues of notifications from other threads that need this thread's attention.
List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> run;
List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> cancel;
List<&_::XThreadEvent::replyNext, &_::XThreadEvent::replyPrev> replies;
bool waitingForCancel = false;
// True if this thread is currently blocked waiting for some other thread to pump its
// cancellation queue. If that other thread tries to block on *this* thread, then it could
// deadlock -- it must take precautions against this.
bool empty() const {
return run.empty() && cancel.empty() && replies.empty();
}
void dispatchAll(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
run.forEach([&](_::XThreadEvent& event) {
run.erase(event);
event.state = _::XThreadEvent::EXECUTING;
event.armBreadthFirst();
});
dispatchCancels(eventsToCancelOutsideLock);
replies.forEach([&](_::XThreadEvent& event) {
replies.erase(event);
event.onReadyEvent.armBreadthFirst();
});
}
void dispatchCancels(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
cancel.forEach([&](_::XThreadEvent& event) {
cancel.erase(event);
if (event.promiseNode == nullptr) {
event.state = _::XThreadEvent::DONE;
} else {
// We can't destroy the promiseNode while the mutex is locked, because we don't know
// what the destructor might do. But, we *must* destroy it before acknowledging
// cancellation. So we have to add it to a list to destroy later.
eventsToCancelOutsideLock.add(&event);
}
});
}
};
kj::MutexGuarded<State> state;
// After modifying state from another thread, the loop's port.wake() must be called.
void processAsyncCancellations(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
// After calling dispatchAll() or dispatchCancels() with the lock held, it may be that some
// cancellations require dropping the lock before destroying the promiseNode. In that case
// those cancellations will be added to the eventsToCancelOutsideLock Vector passed to the
// method. That vector must then be passed to processAsyncCancellations() as soon as the lock
// is released.
for (auto& event: eventsToCancelOutsideLock) {
event->promiseNode = nullptr;
event->disarm();
}
// Now we need to mark all the events "done" under lock.
auto lock = state.lockExclusive();
for (auto& event: eventsToCancelOutsideLock) {
event->state = _::XThreadEvent::DONE;
}
}
};
namespace _ { // (private)
void XThreadEvent::ensureDoneOrCanceled() {
#if _MSC_VER
{ // TODO(perf): TODO(msvc): Implement the double-checked lock optimization on MSVC.
#else
if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
#endif
auto lock = targetExecutor.impl->state.lockExclusive();
switch (state) {
case UNUSED:
// Nothing to do.
break;
case QUEUED:
lock->run.erase(*this);
// No wake needed since we removed work rather than adding it.
state = DONE;
break;
case EXECUTING: {
lock->cancel.insert(*this);
KJ_IF_MAYBE(p, targetExecutor.loop.port) {
p->wake();
}
Maybe<Executor&> maybeSelfExecutor = nullptr;
if (threadLocalEventLoop != nullptr) {
KJ_IF_MAYBE(e, threadLocalEventLoop->executor) {
maybeSelfExecutor = *e;
}
}
KJ_IF_MAYBE(selfExecutor, maybeSelfExecutor) {
// If, while waiting for other threads to process our cancellation request, we have
// cancellation requests queued back to this thread, we must process them. Otherwise,
// we could deadlock with two threads waiting on each other to process cancellations.
//
// We don't have a terribly good way to detect this, except to check if the remote
// thread is itself waiting for cancellations and, if so, wake ourselves up to check for
// cancellations to process. This will busy-loop but at least it should eventually
// resolve assuming fair scheduling.
//
// To make things extra-annoying, in order to update our waitingForCancel flag, we have
// to lock our own executor state, but we can't take both locks at once, so we have to
// release the other lock in the meantime.
// Make sure we unset waitingForCancel on the way out.
KJ_DEFER({
lock = {};
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = false;
selfLock->dispatchCancels(eventsToCancelOutsideLock);
// We don't need to re-take the lock on the other executor here; it's not used again
// after this scope.
});
while (state != DONE) {
bool otherThreadIsWaiting = lock->waitingForCancel;
// Make sure our waitingForCancel is on and dispatch any pending cancellations on this
// thread.
lock = {};
{
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = true;
// Note that we don't have to proactively delete the PromiseNodes extracted from
// the canceled events because those nodes belong to this thread and can't possibly
// continue executing while we're blocked here.
selfLock->dispatchCancels(eventsToCancelOutsideLock);
}
if (otherThreadIsWaiting) {
// We know the other thread was waiting for cancellations to complete a moment ago.
// We may have just processed the necessary cancellations in this thread, in which
// case the other thread needs a chance to receive control and notice this. Or, it
// may be that the other thread is waiting for some third thread to take action.
// Either way, we should yield control here to give things a chance to settle.
// Otherwise we could end up in a tight busy loop.
#if _WIN32
Sleep(0);
#else
sched_yield();
#endif
}
// OK now we can take the original lock again.
lock = targetExecutor.impl->state.lockExclusive();
// OK, now we can wait for the other thread to either process our cancellation or
// indicate that it is waiting for remote cancellation.
lock.wait([&](const Executor::Impl::State& executorState) {
return state == DONE || executorState.waitingForCancel;
});
}
} else {
// We have no executor of our own so we don't have to worry about cancellation cycles
// causing deadlock.
//
// NOTE: I don't think we can actually get here, because it implies that this is a
// synchronous execution, which means there's no way to cancel it.
lock.wait([&](auto&) { return state == DONE; });
}
KJ_DASSERT(targetPrev == nullptr);
break;
}
case DONE:
// Became done while we waited for lock. Nothing to do.
break;
}
}
KJ_IF_MAYBE(e, replyExecutor) {
// Since we know we reached the DONE state (or never left UNUSED), we know that the remote
// thread is all done playing with our `replyPrev` pointer. Only the current thread could
// possibly modify it after this point. So we can skip the lock if it's already null.
if (replyPrev != nullptr) {
auto lock = e->impl->state.lockExclusive();
lock->replies.erase(*this);
}
}
}
void XThreadEvent::done() {
KJ_IF_MAYBE(e, replyExecutor) {
// Queue the reply.
{
auto lock = e->impl->state.lockExclusive();
lock->replies.insert(*this);
}
KJ_IF_MAYBE(p, e->loop.port) {
p->wake();
}
}
bool poll() override { return false; }
{
auto lock = targetExecutor.impl->state.lockExclusive();
KJ_DASSERT(state == EXECUTING);
if (targetPrev != nullptr) {
// We must be in the cancel list, because we can't be in the run list during EXECUTING state.
// We can remove ourselves from the cancel list because at this point we're done anyway, so
// whatever.
lock->cancel.erase(*this);
}
void wake() const override {
// TODO(someday): Implement using condvar.
kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
"Cross-thread events are not yet implemented for EventLoops with no EventPort."));
#if _MSC_VER
// TODO(perf): TODO(msvc): Implement the double-checked lock optimization on MSVC.
state = DONE;
#else
__atomic_store_n(&state, DONE, __ATOMIC_RELEASE);
#endif
}
}
static NullEventPort instance;
class XThreadEvent::DelayedDoneHack: public Disposer {
// Crazy hack: In fire(), we want to call done() if the event is finished. But done() signals
// the requesting thread to wake up and possibly delete the XThreadEvent. But the caller (the
// EventLoop) still has to set `event->firing = false` after `fire()` returns, so this would be
// a race condition use-after-free.
//
// It just so happens, though, that fire() is allowed to return an optional `Own<Event>` to drop,
// and the caller drops that pointer immediately after setting event->firing = false. So we
// return a pointer whose disposer calls done().
//
// It's not quite as much of a hack as it seems: The whole reason fire() returns an Own<Event> is
// so that the event can delete itself, but do so after the caller sets event->firing = false.
// It just happens to be that in this case, the event isn't deleting itself, but rather releasing
// itself back to the other thread.
protected:
void disposeImpl(void* pointer) const override {
reinterpret_cast<XThreadEvent*>(pointer)->done();
}
};
NullEventPort NullEventPort::instance = NullEventPort();
Maybe<Own<Event>> XThreadEvent::fire() {
static constexpr DelayedDoneHack DISPOSER {};
} // namespace _ (private)
KJ_IF_MAYBE(n, promiseNode) {
n->get()->get(result);
promiseNode = nullptr; // make sure to destroy in the thread that created it
return Own<Event>(this, DISPOSER);
} else {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
promiseNode = execute();
})) {
result.addException(kj::mv(*exception));
};
KJ_IF_MAYBE(n, promiseNode) {
n->get()->onReady(this);
} else {
return Own<Event>(this, DISPOSER);
}
}
return nullptr;
}
void XThreadEvent::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
} // namespace _
Executor::Executor(EventLoop& loop, Badge<EventLoop>): loop(loop), impl(kj::heap<Impl>()) {}
Executor::~Executor() noexcept(false) {}
void Executor::send(_::XThreadEvent& event, bool sync) const {
KJ_ASSERT(event.state == _::XThreadEvent::UNUSED);
if (sync) {
if (threadLocalEventLoop == &loop) {
// Invoking a sync request on our own thread. Just execute it directly; if we try to queue
// it to the loop, we'll deadlock.
auto promiseNode = event.execute();
// If the function returns a promise, we have no way to pump the event loop to wait for it,
// because the event loop may already be pumping somewhere up the stack.
KJ_ASSERT(promiseNode == nullptr,
"can't call executeSync() on own thread's executor with a promise-returning function");
return;
}
} else {
event.replyExecutor = getCurrentThreadExecutor();
// Note that async requests will "just work" even if the target executor is our own thread's
// executor. In theory we could detect this case to avoid some locking and signals but that
// would be extra code complexity for probably little benefit.
}
auto lock = impl->state.lockExclusive();
event.state = _::XThreadEvent::QUEUED;
lock->run.insert(event);
KJ_IF_MAYBE(p, loop.port) {
p->wake();
} else {
// Event loop will be waiting on executor.wait(), which will be woken when we unlock the mutex.
}
if (sync) {
lock.wait([&](auto&) { return event.state == _::XThreadEvent::DONE; });
}
}
void Executor::wait() {
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive();
lock.wait([](const Impl::State& state) {
return !state.empty();
});
lock->dispatchAll(eventsToCancelOutsideLock);
}
bool Executor::poll() {
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive();
if (lock->empty()) {
return false;
} else {
lock->dispatchAll(eventsToCancelOutsideLock);
return true;
}
}
const Executor& getCurrentThreadExecutor() {
return currentEventLoop().getExecutor();
}
// =======================================================================================
......@@ -299,8 +691,7 @@ void EventPort::wake() const {
}
EventLoop::EventLoop()
: port(_::NullEventPort::instance),
daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
: daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::EventLoop(EventPort& port)
: port(port),
......@@ -382,9 +773,19 @@ bool EventLoop::isRunnable() {
return head != nullptr;
}
const Executor& EventLoop::getExecutor() {
KJ_IF_MAYBE(e, executor) {
return *e;
} else {
return executor.emplace(*this, Badge<EventLoop>());
}
}
void EventLoop::setRunnable(bool runnable) {
if (runnable != lastRunnableState) {
port.setRunnable(runnable);
KJ_IF_MAYBE(p, port) {
p->setRunnable(runnable);
}
lastRunnableState = runnable;
}
}
......@@ -402,6 +803,34 @@ void EventLoop::leaveScope() {
threadLocalEventLoop = nullptr;
}
void EventLoop::wait() {
KJ_IF_MAYBE(p, port) {
if (p->wait()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->wait();
} else {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
}
void EventLoop::poll() {
KJ_IF_MAYBE(p, port) {
if (p->poll()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
void WaitScope::poll() {
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
......@@ -412,7 +841,7 @@ void WaitScope::poll() {
for (;;) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
loop.poll();
if (!loop.isRunnable()) {
// Still no events in the queue. We're done.
......@@ -441,11 +870,11 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
if (!loop.turn()) {
// No events in the queue. Wait for callback.
counter = 0;
loop.port.wait();
loop.wait();
} else if (++counter > waitScope.busyPollInterval) {
// Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll.
counter = 0;
loop.port.poll();
loop.poll();
}
}
......@@ -473,7 +902,7 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
while (!doneEvent.fired) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
loop.poll();
if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up.
......@@ -511,30 +940,19 @@ void detach(kj::Promise<void>&& promise) {
Event::Event()
: loop(currentEventLoop()), next(nullptr), prev(nullptr) {}
Event::~Event() noexcept(false) {
if (prev != nullptr) {
if (loop.tail == &next) {
loop.tail = prev;
}
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
Event::Event(kj::EventLoop& loop)
: loop(loop), next(nullptr), prev(nullptr) {}
*prev = next;
if (next != nullptr) {
next->prev = prev;
}
}
Event::~Event() noexcept(false) {
disarm();
KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Promise destroyed from a different thread than it was created in.");
}
void Event::armDepthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread.");
"Executor to queue events cross-thread.");
if (prev == nullptr) {
next = *loop.depthFirstInsertPoint;
......@@ -557,7 +975,7 @@ void Event::armDepthFirst() {
void Event::armBreadthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread.");
"Executor to queue events cross-thread.");
if (prev == nullptr) {
next = *loop.tail;
......@@ -573,6 +991,31 @@ void Event::armBreadthFirst() {
}
}
void Event::disarm() {
if (prev != nullptr) {
if (threadLocalEventLoop != &loop && threadLocalEventLoop != nullptr) {
KJ_LOG(FATAL, "Promise destroyed from a different thread than it was created in.");
// There's no way out of this place without UB, so abort now.
abort();
}
if (loop.tail == &next) {
loop.tail = prev;
}
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
*prev = next;
if (next != nullptr) {
next->prev = prev;
}
prev = nullptr;
next = nullptr;
}
}
_::PromiseNode* Event::getInnerForTrace() {
return nullptr;
}
......@@ -654,6 +1097,17 @@ void PromiseNode::OnReadyEvent::arm() {
event = _kJ_ALREADY_READY;
}
void PromiseNode::OnReadyEvent::armBreadthFirst() {
KJ_ASSERT(event != _kJ_ALREADY_READY, "armBreadthFirst() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it.
event->armBreadthFirst();
}
event = _kJ_ALREADY_READY;
}
// -------------------------------------------------------------------
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
......
......@@ -322,6 +322,8 @@ private:
template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
friend Promise<void> joinPromises(Array<Promise<void>>&& promises);
friend class _::XThreadEvent;
friend class Executor;
};
template <typename T>
......@@ -649,6 +651,97 @@ private:
Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller;
};
// =======================================================================================
// Cross-thread execution.
class Executor {
// Executes code on another thread's event loop.
//
// Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current
// thread's event loop. You may then pass the reference to other threads to enable them to call
// back to this one.
public:
Executor(EventLoop& loop, Badge<EventLoop>);
~Executor() noexcept(false);
template <typename Func>
PromiseForResult<Func, void> executeAsync(Func&& func) const;
// Call from any thread to request that the given function be executed on the executor's thread,
// returning a promise for the result.
//
// The Promise returned by executeAsync() belongs to the requesting thread, not the executor
// thread. Hence, for example, continuations added to this promise with .then() will exceute in
// the requesting thread.
//
// If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting
// thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread
// awaits the promise. Once it resolves to a final result, that result is transferred to the
// requesting thread, resolving the promise that executeAsync() returned earlier.
//
// `func` will be destroyed in the requesting thread, after the final result has been returned
// from the executor thread. This means that it is safe for `func` to capture objects that cannot
// safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference,
// so long as the functor remains live until the promise completes or is canceled, and the
// function is thread-safe.
//
// Of course, the body of `func` must be careful that any access it makes on these objects is
// safe cross-thread. For example, it must not attempt to access Promise-related objects
// cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it
// from another. Unfortunately, the usual convention of using const-correctness to enforce
// thread-safety does not work here, because applications can often ensure that `func` has
// exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe
// ways; the const qualifier is not sufficient to express this.
//
// The final return value of `func` is transferred between threads, and hence is constructed and
// destroyed in separate threads. It is the app's responsibility to make sure this is OK.
// Alternatively, the app can perhaps arrange to send the return value back to the original
// thread for destruction, if needed.
//
// TODO(now): Decide if we should automatically wrap the return value such that it will be
// returned to its own thread for destruction.
//
// If the requesting thread destroys the returned Promise, the destructor will block waiting for
// the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed
// before the Promise's destructor returns.
//
// Multiple calls to executeAsync() from the same requesting thread to the same target thread
// will be delivered in the same order in which they were requested. (However, if func() returns
// a promise, delivery of subsequent calls is not blocked on that promise. In other words, this
// call provides E-Order in the same way as Cap'n Proto.)
template <typename Func>
_::UnwrapPromise<PromiseForResult<Func, void>> executeSync(Func&& func) const;
// Schedules `func()` to execute on the executor thread, and then blocks the requesting thread
// until `func()` completes. If `func()` returns a Promise, then the wait will continue until
// that promise resolves, and the final result will be returned to the requesting thread.
//
// The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that
// loop will *not* execute while the thread is blocked. This method is particularly useful to
// allow non-event-loop threads to perform I/O via a separate event-loop thread.
//
// As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the
// executor thread has signaled completion. The return value is transferred between threads.
private:
EventLoop& loop;
struct Impl;
Own<Impl> impl;
// To avoid including mutex.h...
friend class EventLoop;
friend class _::XThreadEvent;
void send(_::XThreadEvent& event, bool sync) const;
void wait();
bool poll();
};
const Executor& getCurrentThreadExecutor();
// Get the executor for the current thread's event loop. This reference can then be passed to other
// threads.
// =======================================================================================
// The EventLoop class
......@@ -753,8 +846,19 @@ public:
bool isRunnable();
// Returns true if run() would currently do anything, or false if the queue is empty.
const Executor& getExecutor();
// Returns an Executor that can be used to schedule events on this EventLoop from another thread.
//
// Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's
// Executor.
//
// Note that this is only needed for cross-thread scheduling. To schedule code to run later in
// the current thread, use `kj::evalLater()`, which will be more efficient.
private:
EventPort& port;
kj::Maybe<EventPort&> port;
// If null, this thread doesn't receive I/O events from the OS. It can potentially receive
// events from other threads via the Executor.
bool running = false;
// True while looping -- wait() is then not allowed.
......@@ -766,6 +870,9 @@ private:
_::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head;
kj::Maybe<Executor> executor;
// Allocated the first time getExecutor() is requested, making cross-thread request possible.
Own<TaskSet> daemons;
bool turn();
......@@ -773,12 +880,17 @@ private:
void enterScope();
void leaveScope();
void wait();
void poll();
friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
friend class _::Event;
friend class WaitScope;
friend class Executor;
friend class _::XThreadEvent;
};
class WaitScope {
......
......@@ -503,6 +503,35 @@ constexpr bool canMemcpy() {
static_assert(kj::canMemcpy<T>(), "this code expects this type to be memcpy()-able");
#endif
template <typename T>
class Badge {
// A pattern for marking individual methods such that they can only be called from a specific
// caller class: Make the method public but give it a parameter of type `Badge<Caller>`. Only
// `Caller` can construct one, so only `Caller` can call the method.
//
// // We only allow calls from the class `Bar`.
// void foo(Badge<Bar>)
//
// The call site looks like:
//
// foo({});
//
// This pattern also works well for declaring private constructors, but still being able to use
// them with `kj::heap()`, etc.
//
// Idea from: https://awesomekling.github.io/Serenity-C++-patterns-The-Badge/
//
// Note that some forms of this idea make the copy constructor private as well, in order to
// prohibit `Badge<NotMe>(*(Badge<NotMe>*)nullptr)`. However, that would prevent badges from
// being passed through forwarding functions like `kj::heap()`, which would ruin one of the main
// use cases for this pattern in KJ. In any case, dereferencing a null pointer is UB; there are
// plenty of other ways to get access to private members if you're willing to go UB. For one-off
// debugging purposes, you might as well use `#define private public` at the top of the file.
private:
Badge() {}
friend T;
};
// =======================================================================================
// Equivalents to std::move() and std::forward(), since these are very commonly needed and the
// std header <utility> pulls in lots of other stuff.
......
......@@ -25,8 +25,10 @@
#include <string.h>
#include <errno.h>
#if _WIN32
#if _WIN32 || __CYGWIN__
#if !__CYGWIN__
#define strerror_r(errno,buf,len) strerror_s(buf,len,errno)
#endif
#define NOMINMAX 1
#define WIN32_LEAN_AND_MEAN 1
#define NOSERVICE 1
......@@ -35,6 +37,7 @@
#include <windows.h>
#include "windows-sanity.h"
#include "encoding.h"
#include <wchar.h>
#endif
namespace kj {
......@@ -133,7 +136,7 @@ Exception::Type typeOfErrno(int error) {
}
}
#if _WIN32
#if _WIN32 || __CYGWIN__
Exception::Type typeOfWin32Error(DWORD error) {
switch (error) {
......@@ -355,7 +358,7 @@ void Debug::Fault::init(
makeDescriptionImpl(SYSCALL, condition, osErrorNumber, nullptr, macroArgs, argValues));
}
#if _WIN32
#if _WIN32 || __CYGWIN__
void Debug::Fault::init(
const char* file, int line, Win32Result osErrorNumber,
const char* condition, const char* macroArgs, ArrayPtr<String> argValues) {
......@@ -401,7 +404,7 @@ int Debug::getOsErrorNumber(bool nonblocking) {
: result;
}
#if _WIN32
#if _WIN32 || __CYGWIN__
uint Debug::getWin32ErrorCode() {
return ::GetLastError();
}
......
......@@ -161,7 +161,7 @@ namespace kj {
for (::kj::_::Debug::Fault f(__FILE__, __LINE__, \
errorNumber, code, "" #__VA_ARGS__, __VA_ARGS__);; f.fatal())
#if _WIN32
#if _WIN32 || __CYGWIN__
#define KJ_WIN32(call, ...) \
if (auto _kjWin32Result = ::kj::_::Debug::win32Call(call)) {} else \
......@@ -241,7 +241,7 @@ namespace kj {
for (::kj::_::Debug::Fault f(__FILE__, __LINE__, \
errorNumber, code, #__VA_ARGS__, ##__VA_ARGS__);; f.fatal())
#if _WIN32
#if _WIN32 || __CYGWIN__
#define KJ_WIN32(call, ...) \
if (auto _kjWin32Result = ::kj::_::Debug::win32Call(call)) {} else \
......@@ -311,7 +311,7 @@ namespace kj {
// handleSuccessCase();
// }
#if _WIN32
#if _WIN32 || __CYGWIN__
#define KJ_WIN32_HANDLE_ERRORS(call) \
if (uint _kjWin32Error = ::kj::_::Debug::win32Call(call).number) \
......@@ -358,7 +358,7 @@ public:
typedef LogSeverity Severity; // backwards-compatibility
#if _WIN32
#if _WIN32 || __CYGWIN__
struct Win32Result {
uint number;
inline explicit Win32Result(uint number): number(number) {}
......@@ -387,7 +387,7 @@ public:
const char* condition, const char* macroArgs);
Fault(const char* file, int line, int osErrorNumber,
const char* condition, const char* macroArgs);
#if _WIN32
#if _WIN32 || __CYGWIN__
Fault(const char* file, int line, Win32Result osErrorNumber,
const char* condition, const char* macroArgs);
#endif
......@@ -401,7 +401,7 @@ public:
const char* condition, const char* macroArgs, ArrayPtr<String> argValues);
void init(const char* file, int line, int osErrorNumber,
const char* condition, const char* macroArgs, ArrayPtr<String> argValues);
#if _WIN32
#if _WIN32 || __CYGWIN__
void init(const char* file, int line, Win32Result osErrorNumber,
const char* condition, const char* macroArgs, ArrayPtr<String> argValues);
#endif
......@@ -424,7 +424,7 @@ public:
template <typename Call>
static int syscallError(Call&& call, bool nonblocking);
#if _WIN32
#if _WIN32 || __CYGWIN__
static Win32Result win32Call(int boolean);
static Win32Result win32Call(void* handle);
static Win32Result winsockCall(int result);
......@@ -520,7 +520,7 @@ inline Debug::Fault::Fault(const char* file, int line, kj::Exception::Type type,
init(file, line, type, condition, macroArgs, nullptr);
}
#if _WIN32
#if _WIN32 || __CYGWIN__
inline Debug::Fault::Fault(const char* file, int line, Win32Result osErrorNumber,
const char* condition, const char* macroArgs)
: exception(nullptr) {
......
......@@ -23,6 +23,13 @@
#define _GNU_SOURCE
#endif
#if (_WIN32 && _M_X64) || (__CYGWIN__ && __x86_64__)
// Currently the Win32 stack-trace code only supports x86_64. We could easily extend it to support
// i386 as well but it requires some code changes around how we read the context to start the
// trace.
#define KJ_USE_WIN32_DBGHELP 1
#endif
#include "exception.h"
#include "string.h"
#include "debug.h"
......@@ -51,18 +58,23 @@
#include <execinfo.h>
#endif
#if _WIN32
#if _WIN32 || __CYGWIN__
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include "windows-sanity.h"
#include <dbghelp.h>
#endif
#if (__linux__ || __APPLE__)
#if (__linux__ || __APPLE__ || __CYGWIN__)
#include <stdio.h>
#include <pthread.h>
#endif
#if __CYGWIN__
#include <sys/cygwin.h>
#include <ucontext.h>
#endif
#if KJ_HAS_LIBDL
#include "dlfcn.h"
#endif
......@@ -81,10 +93,7 @@ StringPtr KJ_STRINGIFY(LogSeverity severity) {
return SEVERITY_STRINGS[static_cast<uint>(severity)];
}
#if _WIN32 && _M_X64
// Currently the Win32 stack-trace code only supports x86_64. We could easily extend it to support
// i386 as well but it requires some code changes around how we read the context to start the
// trace.
#if KJ_USE_WIN32_DBGHELP
namespace {
......@@ -136,6 +145,13 @@ const Dbghelp& getDbghelp() {
ArrayPtr<void* const> getStackTrace(ArrayPtr<void*> space, uint ignoreCount,
HANDLE thread, CONTEXT& context) {
// NOTE: Apparently there is a function CaptureStackBackTrace() that is equivalent to glibc's
// backtrace(). Somehow I missed that when I originally wrote this. However,
// CaptureStackBackTrace() does not accept a CONTEXT parameter; it can only trace the caller.
// That's more problematic on Windows where breakHandler(), sehHandler(), and Cygwin signal
// handlers all depend on the ability to pass a CONTEXT. So we'll keep this code, which works
// after all.
const Dbghelp& dbghelp = getDbghelp();
if (dbghelp.stackWalk64 == nullptr ||
dbghelp.symFunctionTableAccess64 == nullptr ||
......@@ -179,7 +195,7 @@ ArrayPtr<void* const> getStackTrace(ArrayPtr<void*> space, uint ignoreCount) {
return nullptr;
}
#if _WIN32 && _M_X64
#if KJ_USE_WIN32_DBGHELP
CONTEXT context;
RtlCaptureContext(&context);
return getStackTrace(space, ignoreCount, GetCurrentThread(), context);
......@@ -207,7 +223,7 @@ String stringifyStackTrace(ArrayPtr<void* const> trace) {
return nullptr;
}
#if _WIN32 && _M_X64 && _MSC_VER
#if KJ_USE_WIN32_DBGHELP && _MSC_VER
// Try to get file/line using SymGetLineFromAddr64(). We don't bother if we aren't on MSVC since
// this requires MSVC debug info.
......@@ -232,7 +248,7 @@ String stringifyStackTrace(ArrayPtr<void* const> trace) {
return strArray(lines, "");
#elif (__linux__ || __APPLE__) && !__ANDROID__
#elif (__linux__ || __APPLE__ || __CYGWIN__) && !__ANDROID__
// We want to generate a human-readable stack trace.
// TODO(someday): It would be really great if we could avoid farming out to another process
......@@ -274,6 +290,16 @@ String stringifyStackTrace(ArrayPtr<void* const> trace) {
// The Mac OS X equivalent of addr2line is atos.
// (Internally, it uses the private CoreSymbolication.framework library.)
p = popen(str("xcrun atos -p ", getpid(), ' ', strTrace).cStr(), "r");
#elif __CYGWIN__
wchar_t exeWinPath[MAX_PATH];
if (GetModuleFileNameW(nullptr, exeWinPath, sizeof(exeWinPath)) == 0) {
return nullptr;
}
char exePosixPath[MAX_PATH * 2];
if (cygwin_conv_path(CCP_WIN_W_TO_POSIX, exeWinPath, exePosixPath, sizeof(exePosixPath)) < 0) {
return nullptr;
}
p = popen(str("addr2line -e '", exePosixPath, "' ", strTrace).cStr(), "r");
#endif
if (p == nullptr) {
......@@ -378,7 +404,44 @@ String getStackTrace() {
return kj::str(stringifyStackTraceAddresses(trace), stringifyStackTrace(trace));
}
#if _WIN32 && _M_X64
namespace {
void terminateHandler() {
void* traceSpace[32];
// ignoreCount = 3 to ignore std::terminate entry.
auto trace = kj::getStackTrace(traceSpace, 3);
kj::String message;
auto eptr = std::current_exception();
if (eptr != nullptr) {
try {
std::rethrow_exception(eptr);
} catch (const kj::Exception& exception) {
message = kj::str("*** Fatal uncaught kj::Exception: ", exception, '\n');
} catch (const std::exception& exception) {
message = kj::str("*** Fatal uncaught std::exception: ", exception.what(),
"\nstack: ", stringifyStackTraceAddresses(trace),
stringifyStackTrace(trace), '\n');
} catch (...) {
message = kj::str("*** Fatal uncaught exception of type: ", kj::getCaughtExceptionType(),
"\nstack: ", stringifyStackTraceAddresses(trace),
stringifyStackTrace(trace), '\n');
}
} else {
message = kj::str("*** std::terminate() called with no exception"
"\nstack: ", stringifyStackTraceAddresses(trace),
stringifyStackTrace(trace), '\n');
}
kj::FdOutputStream(STDERR_FILENO).write(message.begin(), message.size());
_exit(1);
}
} // namespace
#if KJ_USE_WIN32_DBGHELP && !__CYGWIN__
namespace {
DWORD mainThreadId = 0;
......@@ -461,16 +524,41 @@ void printStackTraceOnCrash() {
mainThreadId = GetCurrentThreadId();
KJ_WIN32(SetConsoleCtrlHandler(breakHandler, TRUE));
SetUnhandledExceptionFilter(&sehHandler);
// Also override std::terminate() handler with something nicer for KJ.
std::set_terminate(&terminateHandler);
}
#elif KJ_HAS_BACKTRACE
#elif _WIN32
// Windows, but KJ_USE_WIN32_DBGHELP is not enabled. We can't print useful stack traces, so don't
// try to catch SEH nor ctrl+C.
void printStackTraceOnCrash() {
std::set_terminate(&terminateHandler);
}
#else
namespace {
void crashHandler(int signo, siginfo_t* info, void* context) {
void* traceSpace[32];
#if KJ_USE_WIN32_DBGHELP
// Win32 backtracing can't trace its way out of a Cygwin signal handler. However, Cygwin gives
// us direct access to the CONTEXT, which we can pass to the Win32 tracing functions.
ucontext_t* ucontext = reinterpret_cast<ucontext_t*>(context);
// Cygwin's mcontext_t has the same layout as CONTEXT.
// TODO(someday): Figure out why this produces garbage for SIGINT from ctrl+C. It seems to work
// correctly for SIGSEGV.
CONTEXT win32Context;
static_assert(sizeof(ucontext->uc_mcontext) >= sizeof(win32Context),
"mcontext_t should be an extension of CONTEXT");
memcpy(&win32Context, &ucontext->uc_mcontext, sizeof(win32Context));
auto trace = getStackTrace(traceSpace, 0, GetCurrentThread(), win32Context);
#else
// ignoreCount = 2 to ignore crashHandler() and signal trampoline.
auto trace = getStackTrace(traceSpace, 2);
#endif
auto message = kj::str("*** Received signal #", signo, ": ", strsignal(signo),
"\nstack: ", stringifyStackTraceAddresses(trace),
......@@ -523,9 +611,9 @@ void printStackTraceOnCrash() {
// because stack traces on ctrl+c can be obnoxious for, say, command-line tools.
KJ_SYSCALL(sigaction(SIGINT, &action, nullptr));
#endif
}
#else
void printStackTraceOnCrash() {
// Also override std::terminate() handler with something nicer for KJ.
std::set_terminate(&terminateHandler);
}
#endif
......
......@@ -120,7 +120,7 @@ TEST(Mutex, MutexGuarded) {
EXPECT_EQ(321u, *value.lockExclusive());
#if !_WIN32 // Not checked on win32.
#if !_WIN32 && !__CYGWIN__ // Not checked on win32.
EXPECT_DEBUG_ANY_THROW(value.getAlreadyLockedExclusive());
EXPECT_DEBUG_ANY_THROW(value.getAlreadyLockedShared());
#endif
......@@ -363,13 +363,15 @@ TEST(Mutex, WhenWithTimeoutPreciseTiming) {
return n == 321;
}, [](uint& n) {
return 456;
}, 20 * kj::MILLISECONDS);
}, 100 * kj::MILLISECONDS);
KJ_EXPECT(m == 456);
auto t = clock.now() - start;
KJ_EXPECT(t >= 20 * kj::MILLISECONDS);
if (t <= 22 * kj::MILLISECONDS) {
KJ_EXPECT(t >= 100 * kj::MILLISECONDS);
// Provide a large margin of error here because some operating systems (e.g. Windows) can have
// long timeslices (13ms) and won't schedule more precisely than a timeslice.
if (t <= 120 * kj::MILLISECONDS) {
return;
}
}
......@@ -395,19 +397,43 @@ TEST(Mutex, WhenWithTimeoutPreciseTimingAfterInterrupt) {
return n == 321;
}, [](uint& n) {
return 456;
}, 20 * kj::MILLISECONDS);
}, 100 * kj::MILLISECONDS);
KJ_EXPECT(m == 456);
auto t = clock.now() - start;
KJ_EXPECT(t >= 20 * kj::MILLISECONDS, t / kj::MILLISECONDS);
if (t <= 22 * kj::MILLISECONDS) {
KJ_EXPECT(t >= 100 * kj::MILLISECONDS, t / kj::MILLISECONDS);
// Provide a large margin of error here because some operating systems (e.g. Windows) can have
// long timeslices (13ms) and won't schedule more precisely than a timeslice.
if (t <= 120 * kj::MILLISECONDS) {
return;
}
}
KJ_FAIL_ASSERT("time not within expected bounds even after retries");
}
KJ_TEST("wait()s wake each other") {
MutexGuarded<uint> value(0);
{
kj::Thread thread([&]() {
auto lock = value.lockExclusive();
++*lock;
lock.wait([](uint value) { return value == 2; });
++*lock;
lock.wait([](uint value) { return value == 4; });
});
{
auto lock = value.lockExclusive();
lock.wait([](uint value) { return value == 1; });
++*lock;
lock.wait([](uint value) { return value == 3; });
++*lock;
}
}
}
TEST(Mutex, Lazy) {
Lazy<uint> lazy;
volatile bool initStarted = false;
......@@ -548,5 +574,27 @@ KJ_TEST("ExternalMutexGuarded<T> destroy without release") {
}
}
KJ_TEST("condvar wait with flapping predicate") {
// This used to deadlock under some implementations due to a wait() checking its own predicate
// as part of unlock()ing the mutex. Adding `waiterToSkip` fixed this (and also eliminated a
// redundant call to the predicate).
MutexGuarded<uint> guarded(0);
Thread thread([&]() {
delay();
*guarded.lockExclusive() = 1;
});
{
auto lock = guarded.lockExclusive();
bool flap = true;
lock.wait([&](uint i) {
flap = !flap;
return i == 1 || flap;
});
}
}
} // namespace
} // namespace kj
......@@ -19,7 +19,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#if _WIN32 || __CYGWIN__
#define WIN32_LEAN_AND_MEAN 1 // lolz
#define WINVER 0x0600
#define _WIN32_WINNT 0x0600
......@@ -28,7 +28,7 @@
#include "mutex.h"
#include "debug.h"
#if !_WIN32
#if !_WIN32 && !__CYGWIN__
#include <time.h>
#include <errno.h>
#endif
......@@ -50,7 +50,7 @@
#define FUTEX_WAKE_PRIVATE FUTEX_WAKE
#endif
#elif _WIN32
#elif _WIN32 || __CYGWIN__
#include <windows.h>
#endif
......@@ -95,7 +95,7 @@ bool Mutex::checkPredicate(Waiter& waiter) {
return result;
}
#if !_WIN32
#if !_WIN32 && !__CYGWIN__
namespace {
TimePoint toTimePoint(struct timespec ts) {
......@@ -172,7 +172,7 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
void Mutex::unlock(Exclusivity exclusivity) {
void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
switch (exclusivity) {
case EXCLUSIVE: {
KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked.");
......@@ -184,7 +184,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (checkPredicate(*waiter)) {
if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up.
if (waiter->hasTimeout) {
// In this case we need to be careful to make sure the target thread isn't already
......@@ -265,28 +265,21 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0, timeout != nullptr };
addWaiter(waiter);
// To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
// currently.
bool currentlyLocked = true;
KJ_DEFER({
if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
});
if (!predicate.check()) {
unlock(EXCLUSIVE);
unlock(EXCLUSIVE, &waiter);
currentlyLocked = false;
struct timespec ts;
......@@ -429,7 +422,7 @@ void Once::reset() {
}
}
#elif _WIN32
#elif _WIN32 || __CYGWIN__
// =======================================================================================
// Win32 implementation
......@@ -454,36 +447,44 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
void Mutex::unlock(Exclusivity exclusivity) {
void Mutex::wakeReadyWaiter(Waiter* waiterToSkip) {
// Look for a waiter whose predicate is now evaluating true, and wake it. We wake no more than
// one waiter because only one waiter could get the lock anyway, and once it releases that lock
// it will awake the next waiter if necessary.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use Wake vs. WakeAll here since there's always only one thread waiting.
WakeConditionVariable(&coercedCondvar(waiter->condvar));
// We only need to wake one waiter. Note that unlike the futex-based implementation, we
// cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
// that the condition is still true when that waiter finally awakes. However, if the
// condition is no longer true at that point, the waiter will re-check all other
// waiters' conditions and possibly wake up any other waiter who is now ready, hence we
// still only need to wake one waiter here.
return;
}
} else {
// No more waiters.
break;
}
}
}
void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
switch (exclusivity) {
case EXCLUSIVE: {
KJ_DEFER(ReleaseSRWLockExclusive(&coercedSrwLock));
// Check if there are any conditional waiters. Note we only do this when unlocking an
// exclusive lock since under a shared lock the state couldn't have changed.
auto nextWaiter = waitersHead;
for (;;) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use Wake vs. WakeAll here since there's always only one thread waiting.
WakeConditionVariable(&coercedCondvar(waiter->condvar));
// We only need to wake one waiter. Note that unlike the futex-based implementation, we
// cannot "transfer ownership" of the lock to the waiter, therefore we cannot guarantee
// that the condition is still true when that waiter finally awakes. However, if the
// condition is no longer true at that point, the waiter will re-check all other
// waiters' conditions and possibly wake up any other waiter who is now ready, hence we
// still only need to wake one waiter here.
return;
}
} else {
// No more waiters.
break;
}
}
wakeReadyWaiter(waiterToSkip);
break;
}
......@@ -500,12 +501,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
// held for debug purposes anyway, we just don't bother.
}
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Any exceptions should leave the mutex unlocked.
KJ_ON_SCOPE_FAILURE(unlock(EXCLUSIVE));
void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
// Add waiter to list.
Waiter waiter { nullptr, waitersTail, predicate, nullptr, 0 };
static_assert(sizeof(waiter.condvar) == sizeof(CONDITION_VARIABLE),
......@@ -542,6 +538,10 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
}
while (!predicate.check()) {
// SleepConditionVariableSRW() will temporarily release the lock, so we need to signal other
// waiters that are now ready.
wakeReadyWaiter(&waiter);
if (SleepConditionVariableSRW(&coercedCondvar(waiter.condvar), &coercedSrwLock, sleepMs, 0)) {
// Normal result. Continue loop to check predicate.
} else {
......@@ -669,7 +669,7 @@ void Mutex::lock(Exclusivity exclusivity) {
}
}
void Mutex::unlock(Exclusivity exclusivity) {
void Mutex::unlock(Exclusivity exclusivity, Waiter* waiterToSkip) {
KJ_DEFER(KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)));
if (exclusivity == EXCLUSIVE) {
......@@ -680,7 +680,7 @@ void Mutex::unlock(Exclusivity exclusivity) {
KJ_IF_MAYBE(waiter, nextWaiter) {
nextWaiter = waiter->next;
if (checkPredicate(*waiter)) {
if (waiter != waiterToSkip && checkPredicate(*waiter)) {
// This waiter's predicate now evaluates true, so wake it up. It doesn't matter if we
// use _signal() vs. _broadcast() here since there's always only one thread waiting.
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter->stupidMutex));
......@@ -723,27 +723,20 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
lock(EXCLUSIVE);
// Since the predicate might throw, we should be careful to remember if we've locked the mutex
// and unlock it on the way out.
bool currentlyLocked = true;
KJ_ON_SCOPE_FAILURE({
if (currentlyLocked) unlock(EXCLUSIVE);
});
void Mutex::wait(Predicate& predicate, Maybe<Duration> timeout) {
// Add waiter to list.
Waiter waiter {
nullptr, waitersTail, predicate, nullptr,
PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER
};
addWaiter(waiter);
// To guarantee that we've re-locked the mutex before scope exit, keep track of whether it is
// currently.
bool currentlyLocked = true;
KJ_DEFER({
if (!currentlyLocked) lock(EXCLUSIVE);
removeWaiter(waiter);
if (!currentlyLocked) unlock(EXCLUSIVE);
// Destroy pthread objects.
KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&waiter.stupidMutex));
......@@ -775,7 +768,7 @@ void Mutex::lockWhen(Predicate& predicate, Maybe<Duration> timeout) {
KJ_PTHREAD_CALL(pthread_mutex_lock(&waiter.stupidMutex));
// OK, now we can unlock the main mutex.
unlock(EXCLUSIVE);
unlock(EXCLUSIVE, &waiter);
currentlyLocked = false;
bool timedOut = false;
......
......@@ -33,8 +33,14 @@
#define KJ_USE_FUTEX 1
#endif
#if !KJ_USE_FUTEX && !_WIN32
// On Linux we use futex. On other platforms we wrap pthreads.
#if !KJ_USE_FUTEX && !_WIN32 && !__CYGWIN__
// We fall back to pthreads when we don't have a better platform-specific primitive. pthreads
// mutexes are bloated, though, so we like to avoid them. Hence on Linux we use futex(), and on
// Windows we use SRW locks and friends. On Cygwin we prefer the Win32 primitives both because they
// are more efficient and because I ran into problems with Cygwin's implementation of RW locks
// seeming to allow multiple threads to lock the same mutex (but I didn't investigate very
// closely).
//
// TODO(someday): Write efficient low-level locking primitives for other platforms.
#include <pthread.h>
#endif
......@@ -51,6 +57,8 @@ namespace _ { // private
class Mutex {
// Internal implementation details. See `MutexGuarded<T>`.
struct Waiter;
public:
Mutex();
~Mutex();
......@@ -62,7 +70,7 @@ public:
};
void lock(Exclusivity exclusivity);
void unlock(Exclusivity exclusivity);
void unlock(Exclusivity exclusivity, Waiter* waiterToSkip = nullptr);
void assertLockedByCaller(Exclusivity exclusivity);
// In debug mode, assert that the mutex is locked by the calling thread, or if that is
......@@ -74,14 +82,16 @@ public:
virtual bool check() = 0;
};
void lockWhen(Predicate& predicate, Maybe<Duration> timeout = nullptr);
// Lock (exclusively) when predicate.check() returns true, or when the timeout (if any) expires.
// The mutex is always locked when this returns regardless of whether the timeout expired (and
// always unlocked if it throws).
void wait(Predicate& predicate, Maybe<Duration> timeout = nullptr);
// If predicate.check() returns false, unlock the mutex until predicate.check() returns true, or
// when the timeout (if any) expires. The mutex is always re-locked when this returns regardless
// of whether the timeout expired, and including if it throws.
//
// Requires that the mutex is already exclusively locked before calling.
void induceSpuriousWakeupForTest();
// Utility method for mutex-test.c++ which causes a spurious thread wakeup on all threads that
// are waiting for a lockWhen() condition. Assuming correct implementation, all those threads
// are waiting for a wait() condition. Assuming correct implementation, all those threads
// should immediately go back to sleep.
private:
......@@ -97,7 +107,7 @@ private:
static constexpr uint EXCLUSIVE_REQUESTED = 1u << 30;
static constexpr uint SHARED_COUNT_MASK = EXCLUSIVE_REQUESTED - 1;
#elif _WIN32
#elif _WIN32 || __CYGWIN__
uintptr_t srwLock; // Actually an SRWLOCK, but don't want to #include <windows.h> in header.
#else
......@@ -112,7 +122,7 @@ private:
#if KJ_USE_FUTEX
uint futex;
bool hasTimeout;
#elif _WIN32
#elif _WIN32 || __CYGWIN__
uintptr_t condvar;
// Actually CONDITION_VARIABLE, but don't want to #include <windows.h> in header.
#else
......@@ -131,6 +141,9 @@ private:
inline void addWaiter(Waiter& waiter);
inline void removeWaiter(Waiter& waiter);
bool checkPredicate(Waiter& waiter);
#if _WIN32 || __CYGWIN__
void wakeReadyWaiter(Waiter* waiterToSkip);
#endif
};
class Once {
......@@ -153,7 +166,7 @@ public:
void runOnce(Initializer& init);
#if _WIN32 // TODO(perf): Can we make this inline on win32 somehow?
#if _WIN32 || __CYGWIN__ // TODO(perf): Can we make this inline on win32 somehow?
bool isInitialized() noexcept;
#else
......@@ -183,7 +196,7 @@ private:
INITIALIZED
};
#elif _WIN32
#elif _WIN32 || __CYGWIN__
uintptr_t initOnce; // Actually an INIT_ONCE, but don't want to #include <windows.h> in header.
#else
......@@ -241,6 +254,31 @@ public:
inline operator T*() { return ptr; }
inline operator const T*() const { return ptr; }
template <typename Cond>
void wait(Cond&& condition, Maybe<Duration> timeout = nullptr) {
// Unlocks the lock until `condition(state)` evaluates true (where `state` is type `const T&`
// referencing the object protected by the lock).
// We can't wait on a shared lock because the internal bookkeeping needed for a wait requires
// the protection of an exclusive lock.
static_assert(!isConst<T>(), "cannot wait() on shared lock");
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
return condition(value);
}
Cond&& condition;
const T& value;
PredicateImpl(Cond&& condition, const T& value)
: condition(kj::fwd<Cond>(condition)), value(value) {}
};
PredicateImpl impl(kj::fwd<Cond>(condition), *ptr);
mutex->wait(impl, timeout);
}
private:
_::Mutex* mutex;
T* ptr;
......@@ -321,22 +359,11 @@ public:
// If `timeout` is specified, then after the given amount of time, the callback will be called
// regardless of whether the condition is true. In this case, when `callback()` is called,
// `condition()` may in fact evaluate false, but *only* if the timeout was reached.
//
// TODO(cleanup): lock->wait() is a better interface. Can we deprecate this one?
struct PredicateImpl final: public _::Mutex::Predicate {
bool check() override {
return condition(value);
}
Cond&& condition;
const T& value;
PredicateImpl(Cond&& condition, const T& value)
: condition(kj::fwd<Cond>(condition)), value(value) {}
};
PredicateImpl impl(kj::fwd<Cond>(condition), value);
mutex.lockWhen(impl, timeout);
KJ_DEFER(mutex.unlock(_::Mutex::EXCLUSIVE));
auto lock = lockExclusive();
lock.wait(kj::fwd<Cond>(condition), timeout);
return callback(value);
}
......
......@@ -36,7 +36,7 @@
// windows-sanity.h, we can be sure that no conflicts will occur regardless of in what order the
// application chooses to include these headers vs. windows.h.
#if !_WIN32
#if !_WIN32 && !__CYGWIN__
// Not on Windows. Tell the compiler never to try to include this again.
#pragma once
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment