Commit 0028d85c authored by Kenton Varda's avatar Kenton Varda

Extend KJ event loop to support cross-thread events.

The new `Executor` class provides an interface to run a function on some other thread's EventLoop.

`kj::getCurrentThreadExecutor()` gets a reference to this thread's executor, which can then be exposed to other threads.

When a thread requests execution of a function on another thread, the return value is returned to the requesting thread. The requesting thread may choose to wait synchronously for this return value or -- if the requesting thread has an event loop of its own -- it can get Promise for the eventual result.

Meanwhile, orthogonally, the function can either return a raw result or return a Promise; in the latter case, the Promise is resolved to completion in the executor thread and the final result is sent back to the requesting thread.
parent 2625589f
...@@ -78,12 +78,43 @@ public: ...@@ -78,12 +78,43 @@ public:
Maybe<T> value; Maybe<T> value;
}; };
template <typename T>
inline T convertToReturn(ExceptionOr<T>&& result) {
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
inline void convertToReturn(ExceptionOr<Void>&& result) {
// Override <void> case to use throwRecoverableException().
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
class Event { class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this // An event waiting to be executed. Not for direct use by applications -- promises use this
// internally. // internally.
public: public:
Event(); Event();
Event(kj::EventLoop& loop);
~Event() noexcept(false); ~Event() noexcept(false);
KJ_DISALLOW_COPY(Event); KJ_DISALLOW_COPY(Event);
...@@ -105,6 +136,10 @@ public: ...@@ -105,6 +136,10 @@ public:
void armBreadthFirst(); void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue. // Like `armDepthFirst()` except that the event is placed at the end of the queue.
void disarm();
// If the event is armed but hasn't fired, cancel it. (Destroying the event does this
// implicitly.)
kj::String trace(); kj::String trace();
// Dump debug info about this event. // Dump debug info about this event.
...@@ -165,6 +200,7 @@ protected: ...@@ -165,6 +200,7 @@ protected:
void init(Event* newEvent); void init(Event* newEvent);
void arm(); void arm();
void armBreadthFirst();
// Arms the event if init() has already been called and makes future calls to init() // Arms the event if init() has already been called and makes future calls to init()
// automatically arm the event. // automatically arm the event.
...@@ -877,40 +913,8 @@ Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) { ...@@ -877,40 +913,8 @@ Promise<T> Promise<T>::catch_(ErrorFunc&& errorHandler) {
template <typename T> template <typename T>
T Promise<T>::wait(WaitScope& waitScope) { T Promise<T>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::FixVoid<T>> result; _::ExceptionOr<_::FixVoid<T>> result;
_::waitImpl(kj::mv(node), result, waitScope); _::waitImpl(kj::mv(node), result, waitScope);
return convertToReturn(kj::mv(result));
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
template <>
inline void Promise<void>::wait(WaitScope& waitScope) {
// Override <void> case to use throwRecoverableException().
_::ExceptionOr<_::Void> result;
_::waitImpl(kj::mv(node), result, waitScope);
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
} }
template <typename T> template <typename T>
...@@ -1139,4 +1143,155 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() { ...@@ -1139,4 +1143,155 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() {
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) }; return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
} }
// =======================================================================================
// cross-thread stuff
namespace _ { // (private)
class XThreadEvent: private Event, // it's an event in the target thread
public PromiseNode { // it's a PromiseNode in the requesting thread
public:
XThreadEvent(ExceptionOrValue& result, const Executor& targetExecutor)
: Event(targetExecutor.loop), result(result), targetExecutor(targetExecutor) {}
protected:
void ensureDoneOrCanceled();
// MUST be called in destructor of subclasses to make sure the object is not destroyed while
// still being accessed by the other thread. (This can't be placed in ~XThreadEvent() because
// that destructor doesn't run until the subclass has already been destroyed.)
virtual kj::Maybe<Own<PromiseNode>> execute() = 0;
// Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
// returns null.
template <typename T>
Own<PromiseNode> extractNode(Promise<T> promise) { return kj::mv(promise.node); }
// implements PromiseNode ----------------------------------------------------
void onReady(Event* event) noexcept override;
private:
ExceptionOrValue& result;
const Executor& targetExecutor;
Maybe<const Executor&> replyExecutor; // If executeAsync() was used.
kj::Maybe<Own<PromiseNode>> promiseNode;
// Accessed only in target thread.
Maybe<XThreadEvent&> targetNext;
Maybe<XThreadEvent&>* targetPrev = nullptr;
// Membership in one of the linked lists in the target Executor's work list or cancel list. These
// fields are protected by the target Executor's mutex.
enum {
UNUSED,
// Object was never queued on another thread.
QUEUED,
// Target thread has not yet dequeued the event from crossThreadRequests.events. The requesting
// thread can cancel execution by removing the event from the list.
EXECUTING,
// Target thread has dequeued the event and is executing it. To cancel, the requesting thread
// must add the event to the crossThreadRequests.cancel list.
DONE
// Target thread has completed handling this event and will not touch it again. The requesting
// thread can safely delete the object. The `state` is updated to `DONE` using an atomic
// release operation after ensuring that the event will not be touched again, so that the
// requesting can safely skip locking if it observes the state is already DONE.
} state = UNUSED;
// State, which is also protected by `targetExecutor`'s mutex.
Maybe<XThreadEvent&> replyNext;
Maybe<XThreadEvent&>* replyPrev = nullptr;
// Membership in `replyExecutor`'s reply list. Protected by `replyExecutor`'s mutex. The
// executing thread places the event in the reply list near the end of the `EXECUTING` state.
// Because the thread cannot lock two mutexes at once, it's possible that the reply executor
// will receive the reply while the event is still listed in the EXECUTING state, but it can
// ignore the state and proceed with the result.
OnReadyEvent onReadyEvent;
// Accessed only in requesting thread.
friend class kj::Executor;
void done();
// implements Event ----------------------------------------------------------
Maybe<Own<Event>> fire() override;
// If called with promiseNode == nullptr, it's time to call execute(). If promiseNode != nullptr,
// then it just indicated readiness and we need to get its result.
};
template <typename Func, typename = _::FixVoid<_::ReturnType<Func, void>>>
class XThreadEventImpl final: public XThreadEvent {
// Implementation for a function that does not return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target)
: XThreadEvent(result, target), func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::ReturnType<Func, void>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
result.value = MaybeVoidCaller<Void, FixVoid<decltype(func())>>::apply(func, Void());
return nullptr;
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
template <typename Func, typename T>
class XThreadEventImpl<Func, Promise<T>> final: public XThreadEvent {
// Implementation for a function that DOES return a Promise.
public:
XThreadEventImpl(Func&& func, const Executor& target)
: XThreadEvent(result, target), func(kj::fwd<Func>(func)) {}
~XThreadEventImpl() noexcept(false) { ensureDoneOrCanceled(); }
typedef _::FixVoid<_::UnwrapPromise<PromiseForResult<Func, void>>> ResultT;
kj::Maybe<Own<_::PromiseNode>> execute() override {
auto result = extractNode(func());
KJ_IREQUIRE(result.get() != nullptr);
return kj::mv(result);
}
// implements PromiseNode ----------------------------------------------------
void get(ExceptionOrValue& output) noexcept override {
output.as<ResultT>() = kj::mv(result);
}
private:
Func func;
ExceptionOr<ResultT> result;
friend Executor;
};
} // namespace _ (private)
template <typename Func>
_::UnwrapPromise<PromiseForResult<Func, void>> Executor::executeSync(Func&& func) const {
_::XThreadEventImpl<Func> event(kj::fwd<Func>(func), *this);
send(event, true);
return convertToReturn(kj::mv(event.result));
}
template <typename Func>
PromiseForResult<Func, void> Executor::executeAsync(Func&& func) const {
auto event = kj::heap<_::XThreadEventImpl<Func>>(kj::fwd<Func>(func), *this);
send(*event, false);
return PromiseForResult<Func, void>(false, kj::mv(event));
}
} // namespace kj } // namespace kj
...@@ -68,6 +68,12 @@ using ReducePromises = decltype(reducePromiseType((T*)nullptr, false)); ...@@ -68,6 +68,12 @@ using ReducePromises = decltype(reducePromiseType((T*)nullptr, false));
// reduces Promise<T> to something else. In particular this allows Promise<capnp::RemotePromise<U>> // reduces Promise<T> to something else. In particular this allows Promise<capnp::RemotePromise<U>>
// to reduce to capnp::RemotePromise<U>. // to reduce to capnp::RemotePromise<U>.
template <typename T> struct UnwrapPromise_;
template <typename T> struct UnwrapPromise_<Promise<T>> { typedef T Type; };
template <typename T>
using UnwrapPromise = typename UnwrapPromise_<T>::Type;
class PropagateException { class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of // A functor which accepts a kj::Exception as a parameter and returns a broken promise of
// arbitrary type which simply propagates the exception. // arbitrary type which simply propagates the exception.
...@@ -186,6 +192,7 @@ template <typename T> ...@@ -186,6 +192,7 @@ template <typename T>
class ForkHub; class ForkHub;
class Event; class Event;
class XThreadEvent;
class PromiseBase { class PromiseBase {
public: public:
...@@ -206,6 +213,7 @@ private: ...@@ -206,6 +213,7 @@ private:
template <typename U> template <typename U>
friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises); friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises);
friend Promise<void> kj::joinPromises(Array<Promise<void>>&& promises); friend Promise<void> kj::joinPromises(Array<Promise<void>>&& promises);
friend class XThreadEvent;
}; };
void detach(kj::Promise<void>&& promise); void detach(kj::Promise<void>&& promise);
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include "async.h" #include "async.h"
#include "debug.h" #include "debug.h"
#include "thread.h"
#include "mutex.h"
#include <kj/compat/gtest.h> #include <kj/compat/gtest.h>
namespace kj { namespace kj {
...@@ -826,5 +828,261 @@ KJ_TEST("exclusiveJoin both events complete simultaneously") { ...@@ -826,5 +828,261 @@ KJ_TEST("exclusiveJoin both events complete simultaneously") {
KJ_EXPECT(!joined.poll(waitScope)); KJ_EXPECT(!joined.poll(waitScope));
} }
KJ_TEST("synchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
KJ_TEST("asynchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
EventLoop loop;
WaitScope waitScope(loop);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}).wait(waitScope));
Promise<uint> promise = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456u;
});
KJ_EXPECT(promise.wait(waitScope) == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
KJ_TEST("synchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(i == 321);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
KJ_TEST("asynchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
EventLoop loop;
WaitScope waitScope(loop);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}).wait(waitScope));
Promise<uint> promise2 = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(promise2.wait(waitScope) == 321);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/time.h> #include <sys/time.h>
#include <errno.h> #include <errno.h>
#include "mutex.h"
namespace kj { namespace kj {
namespace { namespace {
...@@ -774,6 +775,67 @@ TEST(AsyncUnixTest, ChildProcess) { ...@@ -774,6 +775,67 @@ TEST(AsyncUnixTest, ChildProcess) {
// child3 will be killed and synchronously waited on the way out. // child3 will be killed and synchronously waited on the way out.
} }
KJ_TEST("UnixEventPort cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "async-win32.h" #include "async-win32.h"
#include "thread.h" #include "thread.h"
#include "test.h" #include "test.h"
#include "mutex.h"
namespace kj { namespace kj {
namespace { namespace {
...@@ -162,6 +163,66 @@ KJ_TEST("Win32IocpEventPort APC") { ...@@ -162,6 +163,66 @@ KJ_TEST("Win32IocpEventPort APC") {
paf.promise.wait(waitScope); paf.promise.wait(waitScope);
} }
KJ_TEST("Win32IocpEventPort cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
Win32IocpEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "debug.h" #include "debug.h"
#include "vector.h" #include "vector.h"
#include "threadlocal.h" #include "threadlocal.h"
#include "mutex.h"
#if KJ_USE_FUTEX #if KJ_USE_FUTEX
#include <unistd.h> #include <unistd.h>
...@@ -272,6 +273,243 @@ LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler(); ...@@ -272,6 +273,243 @@ LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();
// ======================================================================================= // =======================================================================================
struct Executor::Impl {
typedef Maybe<_::XThreadEvent&> _::XThreadEvent::*NextMember;
typedef Maybe<_::XThreadEvent&>* _::XThreadEvent::*PrevMember;
template <NextMember next, PrevMember prev>
struct List {
kj::Maybe<_::XThreadEvent&> head;
kj::Maybe<_::XThreadEvent&>* tail = &head;
bool empty() const {
return head == nullptr;
}
void insert(_::XThreadEvent& event) {
KJ_REQUIRE(event.*prev == nullptr);
*tail = event;
event.*prev = tail;
tail = &(event.*next);
}
void erase(_::XThreadEvent& event) {
KJ_REQUIRE(event.*prev != nullptr);
*(event.*prev) = event.*next;
KJ_IF_MAYBE(n, event.*next) {
n->*prev = event.*prev;
} else {
KJ_DASSERT(tail == &(event.*next));
tail = event.*prev;
}
event.*next = nullptr;
event.*prev = nullptr;
}
template <typename Func>
void forEach(Func&& func) {
kj::Maybe<_::XThreadEvent&> current = head;
for (;;) {
KJ_IF_MAYBE(c, current) {
auto nextItem = c->*next;
func(*c);
current = nextItem;
} else {
break;
}
}
}
};
struct State {
// Queues of notifications from other threads that need this thread's attention.
List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> run;
List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> cancel;
List<&_::XThreadEvent::replyNext, &_::XThreadEvent::replyPrev> replies;
bool empty() const {
return run.empty() && cancel.empty() && replies.empty();
}
void dispatchAll(Vector<Own<_::PromiseNode>>& nodesToDeleteOutsideLock) {
run.forEach([&](_::XThreadEvent& event) {
run.erase(event);
event.state = _::XThreadEvent::EXECUTING;
event.armBreadthFirst();
});
cancel.forEach([&](_::XThreadEvent& event) {
cancel.erase(event);
if (event.state == _::XThreadEvent::EXECUTING) {
KJ_IF_MAYBE(n, event.promiseNode) {
nodesToDeleteOutsideLock.add(kj::mv(*n));
event.promiseNode = nullptr;
}
event.disarm();
event.state = _::XThreadEvent::DONE;
}
event.armBreadthFirst();
});
replies.forEach([&](_::XThreadEvent& event) {
replies.erase(event);
event.onReadyEvent.armBreadthFirst();
});
}
};
kj::MutexGuarded<State> state;
// After modifying state from another thread, the loop's port.wake() must be called.
};
namespace _ { // (private)
void XThreadEvent::ensureDoneOrCanceled() {
#if _MSC_VER
{ // TODO(perf): TODO(msvc): Implement the double-checked lock optimization on MSVC.
#else
if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
#endif
auto lock = targetExecutor.impl->state.lockExclusive();
switch (state) {
case UNUSED:
// Nothing to do.
break;
case QUEUED:
lock->run.erase(*this);
state = DONE;
break;
case EXECUTING:
lock->cancel.insert(*this);
lock.wait([&](auto&) { return state == DONE; });
KJ_DASSERT(targetPrev == nullptr);
break;
case DONE:
// Became done while we waited for lock. Nothing to do.
break;
}
}
KJ_IF_MAYBE(e, replyExecutor) {
// Since we know we reached the DONE state (or never left UNUSED), we know that the remote
// thread is all done playing with our `replyPrev` pointer. Only the current thread could
// possibly modify it after this point. So we can skip the lock if it's already null.
if (replyPrev != nullptr) {
auto lock = e->impl->state.lockExclusive();
lock->replies.erase(*this);
}
}
}
void XThreadEvent::done() {
KJ_IF_MAYBE(e, replyExecutor) {
// Queue the reply.
auto lock = e->impl->state.lockExclusive();
lock->replies.insert(*this);
}
{
auto lock = targetExecutor.impl->state.lockExclusive();
KJ_DASSERT(state == EXECUTING);
if (targetPrev != nullptr) {
// We must be in the cancel list, because we can't be in the run list during EXECUTING state.
// We can remove ourselves from the cancel list because at this point we're done anyway, so
// whatever.
lock->cancel.erase(*this);
}
#if _MSC_VER
// TODO(perf): TODO(msvc): Implement the double-checked lock optimization on MSVC.
state = DONE;
#else
__atomic_store_n(&state, DONE, __ATOMIC_RELEASE);
#endif
}
}
Maybe<Own<Event>> XThreadEvent::fire() {
KJ_IF_MAYBE(n, promiseNode) {
n->get()->get(result);
done();
} else {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
promiseNode = execute();
})) {
result.addException(kj::mv(*exception));
};
KJ_IF_MAYBE(n, promiseNode) {
n->get()->onReady(this);
} else {
done();
}
}
return nullptr;
}
void XThreadEvent::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
} // namespace _
Executor::Executor(EventLoop& loop, Badge<EventLoop>): loop(loop), impl(kj::heap<Impl>()) {}
Executor::~Executor() noexcept(false) {}
void Executor::send(_::XThreadEvent& event, bool sync) const {
KJ_ASSERT(event.state == _::XThreadEvent::UNUSED);
if (!sync) {
event.replyExecutor = getCurrentThreadExecutor();
}
auto lock = impl->state.lockExclusive();
event.state = _::XThreadEvent::QUEUED;
lock->run.insert(event);
KJ_IF_MAYBE(p, loop.port) {
p->wake();
} else {
// Event loop will be waiting on executor.wait(), which will be woken when we unlock the mutex.
}
if (sync) {
lock.wait([&](auto&) { return event.state == _::XThreadEvent::DONE; });
}
}
void Executor::wait() {
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock;
auto lock = impl->state.lockExclusive();
lock.wait([](const Impl::State& state) {
return !state.empty();
});
lock->dispatchAll(nodesToDeleteOutsideLock);
}
bool Executor::poll() {
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock;
auto lock = impl->state.lockExclusive();
if (lock->empty()) {
return false;
} else {
lock->dispatchAll(nodesToDeleteOutsideLock);
return true;
}
}
const Executor& getCurrentThreadExecutor() {
return currentEventLoop().getExecutor();
}
// =======================================================================================
void EventPort::setRunnable(bool runnable) {} void EventPort::setRunnable(bool runnable) {}
void EventPort::wake() const { void EventPort::wake() const {
...@@ -362,6 +600,14 @@ bool EventLoop::isRunnable() { ...@@ -362,6 +600,14 @@ bool EventLoop::isRunnable() {
return head != nullptr; return head != nullptr;
} }
const Executor& EventLoop::getExecutor() {
KJ_IF_MAYBE(e, executor) {
return *e;
} else {
return executor.emplace(*this, Badge<EventLoop>());
}
}
void EventLoop::setRunnable(bool runnable) { void EventLoop::setRunnable(bool runnable) {
if (runnable != lastRunnableState) { if (runnable != lastRunnableState) {
KJ_IF_MAYBE(p, port) { KJ_IF_MAYBE(p, port) {
...@@ -384,6 +630,34 @@ void EventLoop::leaveScope() { ...@@ -384,6 +630,34 @@ void EventLoop::leaveScope() {
threadLocalEventLoop = nullptr; threadLocalEventLoop = nullptr;
} }
void EventLoop::wait() {
KJ_IF_MAYBE(p, port) {
if (p->wait()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->wait();
} else {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
}
void EventLoop::poll() {
KJ_IF_MAYBE(p, port) {
if (p->poll()) {
// Another thread called wake(). Check for cross-thread events.
KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
} else KJ_IF_MAYBE(e, executor) {
e->poll();
}
}
void WaitScope::poll() { void WaitScope::poll() {
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread."); KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks."); KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
...@@ -394,9 +668,7 @@ void WaitScope::poll() { ...@@ -394,9 +668,7 @@ void WaitScope::poll() {
for (;;) { for (;;) {
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Poll for I/O. // No events in the queue. Poll for I/O.
KJ_IF_MAYBE(p, loop.port) { loop.poll();
p->poll();
}
if (!loop.isRunnable()) { if (!loop.isRunnable()) {
// Still no events in the queue. We're done. // Still no events in the queue. We're done.
...@@ -425,17 +697,11 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope ...@@ -425,17 +697,11 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Wait for callback. // No events in the queue. Wait for callback.
counter = 0; counter = 0;
KJ_IF_MAYBE(p, loop.port) { loop.wait();
p->wait();
} else {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
} else if (++counter > waitScope.busyPollInterval) { } else if (++counter > waitScope.busyPollInterval) {
// Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll. // Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll.
counter = 0; counter = 0;
KJ_IF_MAYBE(p, loop.port) { loop.poll();
p->poll();
}
} }
} }
...@@ -463,9 +729,7 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) { ...@@ -463,9 +729,7 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
while (!doneEvent.fired) { while (!doneEvent.fired) {
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Poll for I/O. // No events in the queue. Poll for I/O.
KJ_IF_MAYBE(p, loop.port) { loop.poll();
p->poll();
}
if (!doneEvent.fired && !loop.isRunnable()) { if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up. // No progress. Give up.
...@@ -503,30 +767,19 @@ void detach(kj::Promise<void>&& promise) { ...@@ -503,30 +767,19 @@ void detach(kj::Promise<void>&& promise) {
Event::Event() Event::Event()
: loop(currentEventLoop()), next(nullptr), prev(nullptr) {} : loop(currentEventLoop()), next(nullptr), prev(nullptr) {}
Event::~Event() noexcept(false) { Event::Event(kj::EventLoop& loop)
if (prev != nullptr) { : loop(loop), next(nullptr), prev(nullptr) {}
if (loop.tail == &next) {
loop.tail = prev;
}
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
*prev = next; Event::~Event() noexcept(false) {
if (next != nullptr) { disarm();
next->prev = prev;
}
}
KJ_REQUIRE(!firing, "Promise callback destroyed itself."); KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Promise destroyed from a different thread than it was created in.");
} }
void Event::armDepthFirst() { void Event::armDepthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use " "Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread."); "Executor to queue events cross-thread.");
if (prev == nullptr) { if (prev == nullptr) {
next = *loop.depthFirstInsertPoint; next = *loop.depthFirstInsertPoint;
...@@ -549,7 +802,7 @@ void Event::armDepthFirst() { ...@@ -549,7 +802,7 @@ void Event::armDepthFirst() {
void Event::armBreadthFirst() { void Event::armBreadthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use " "Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread."); "Executor to queue events cross-thread.");
if (prev == nullptr) { if (prev == nullptr) {
next = *loop.tail; next = *loop.tail;
...@@ -565,6 +818,31 @@ void Event::armBreadthFirst() { ...@@ -565,6 +818,31 @@ void Event::armBreadthFirst() {
} }
} }
void Event::disarm() {
if (prev != nullptr) {
if (threadLocalEventLoop != &loop && threadLocalEventLoop != nullptr) {
KJ_LOG(FATAL, "Promise destroyed from a different thread than it was created in.");
// There's no way out of this place without UB, so abort now.
abort();
}
if (loop.tail == &next) {
loop.tail = prev;
}
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
*prev = next;
if (next != nullptr) {
next->prev = prev;
}
prev = nullptr;
next = nullptr;
}
}
_::PromiseNode* Event::getInnerForTrace() { _::PromiseNode* Event::getInnerForTrace() {
return nullptr; return nullptr;
} }
...@@ -646,6 +924,17 @@ void PromiseNode::OnReadyEvent::arm() { ...@@ -646,6 +924,17 @@ void PromiseNode::OnReadyEvent::arm() {
event = _kJ_ALREADY_READY; event = _kJ_ALREADY_READY;
} }
void PromiseNode::OnReadyEvent::armBreadthFirst() {
KJ_ASSERT(event != _kJ_ALREADY_READY, "armBreadthFirst() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it.
event->armBreadthFirst();
}
event = _kJ_ALREADY_READY;
}
// ------------------------------------------------------------------- // -------------------------------------------------------------------
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {} ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
......
...@@ -322,6 +322,8 @@ private: ...@@ -322,6 +322,8 @@ private:
template <typename U> template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises); friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
friend Promise<void> joinPromises(Array<Promise<void>>&& promises); friend Promise<void> joinPromises(Array<Promise<void>>&& promises);
friend class _::XThreadEvent;
friend class Executor;
}; };
template <typename T> template <typename T>
...@@ -649,6 +651,97 @@ private: ...@@ -649,6 +651,97 @@ private:
Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller; Maybe<Own<PromiseFulfiller<void>>> emptyFulfiller;
}; };
// =======================================================================================
// Cross-thread execution.
class Executor {
// Executes code on another thread's event loop.
//
// Use `kj::getCurrentThreadExecutor()` to get an executor that schedules calls on the current
// thread's event loop. You may then pass the reference to other threads to enable them to call
// back to this one.
public:
Executor(EventLoop& loop, Badge<EventLoop>);
~Executor() noexcept(false);
template <typename Func>
PromiseForResult<Func, void> executeAsync(Func&& func) const;
// Call from any thread to request that the given function be executed on the executor's thread,
// returning a promise for the result.
//
// The Promise returned by executeAsync() belongs to the requesting thread, not the executor
// thread. Hence, for example, continuations added to this promise with .then() will exceute in
// the requesting thread.
//
// If func() itself returns a Promise, that Promise is *not* returned verbatim to the requesting
// thread -- after all, Promise objects cannot be used cross-thread. Instead, the executor thread
// awaits the promise. Once it resolves to a final result, that result is transferred to the
// requesting thread, resolving the promise that executeAsync() returned earlier.
//
// `func` will be destroyed in the requesting thread, after the final result has been returned
// from the executor thread. This means that it is safe for `func` to capture objects that cannot
// safely be destroyed from another thread. It is also safe for `func` to be an lvalue reference,
// so long as the functor remains live until the promise completes or is canceled, and the
// function is thread-safe.
//
// Of course, the body of `func` must be careful that any access it makes on these objects is
// safe cross-thread. For example, it must not attempt to access Promise-related objects
// cross-thread; you cannot create a `PromiseFulfiller` in one thread and then `fulfill()` it
// from another. Unfortunately, the usual convention of using const-correctness to enforce
// thread-safety does not work here, because applications can often ensure that `func` has
// exclusive access to captured objects, and thus can safely mutate them even in non-thread-safe
// ways; the const qualifier is not sufficient to express this.
//
// The final return value of `func` is transferred between threads, and hence is constructed and
// destroyed in separate threads. It is the app's responsibility to make sure this is OK.
// Alternatively, the app can perhaps arrange to send the return value back to the original
// thread for destruction, if needed.
//
// TODO(now): Decide if we should automatically wrap the return value such that it will be
// returned to its own thread for destruction.
//
// If the requesting thread destroys the returned Promise, the destructor will block waiting for
// the executor thread to acknowledge cancellation. This ensures that `func` can be destroyed
// before the Promise's destructor returns.
//
// Multiple calls to executeAsync() from the same requesting thread to the same target thread
// will be delivered in the same order in which they were requested. (However, if func() returns
// a promise, delivery of subsequent calls is not blocked on that promise. In other words, this
// call provides E-Order in the same way as Cap'n Proto.)
template <typename Func>
_::UnwrapPromise<PromiseForResult<Func, void>> executeSync(Func&& func) const;
// Schedules `func()` to execute on the executor thread, and then blocks the requesting thread
// until `func()` completes. If `func()` returns a Promise, then the wait will continue until
// that promise resolves, and the final result will be returned to the requesting thread.
//
// The requesting thread does not need to have an EventLoop. If it does have an EventLoop, that
// loop will *not* execute while the thread is blocked. This method is particularly useful to
// allow non-event-loop threads to perform I/O via a separate event-loop thread.
//
// As with `executeAsync()`, `func` is always destroyed on the requesting thread, after the
// executor thread has signaled completion. The return value is transferred between threads.
private:
EventLoop& loop;
struct Impl;
Own<Impl> impl;
// To avoid including mutex.h...
friend class EventLoop;
friend class _::XThreadEvent;
void send(_::XThreadEvent& event, bool sync) const;
void wait();
bool poll();
};
const Executor& getCurrentThreadExecutor();
// Get the executor for the current thread's event loop. This reference can then be passed to other
// threads.
// ======================================================================================= // =======================================================================================
// The EventLoop class // The EventLoop class
...@@ -753,6 +846,15 @@ public: ...@@ -753,6 +846,15 @@ public:
bool isRunnable(); bool isRunnable();
// Returns true if run() would currently do anything, or false if the queue is empty. // Returns true if run() would currently do anything, or false if the queue is empty.
const Executor& getExecutor();
// Returns an Executor that can be used to schedule events on this EventLoop from another thread.
//
// Use the global function kj::getCurrentThreadExecutor() to get the current thread's EventLoop's
// Executor.
//
// Note that this is only needed for cross-thread scheduling. To schedule code to run later in
// the current thread, use `kj::evalLater()`, which will be more efficient.
private: private:
kj::Maybe<EventPort&> port; kj::Maybe<EventPort&> port;
// If null, this thread doesn't receive I/O events from the OS. It can potentially receive // If null, this thread doesn't receive I/O events from the OS. It can potentially receive
...@@ -768,6 +870,9 @@ private: ...@@ -768,6 +870,9 @@ private:
_::Event** tail = &head; _::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head; _::Event** depthFirstInsertPoint = &head;
kj::Maybe<Executor> executor;
// Allocated the first time getExecutor() is requested, making cross-thread request possible.
Own<TaskSet> daemons; Own<TaskSet> daemons;
bool turn(); bool turn();
...@@ -775,12 +880,16 @@ private: ...@@ -775,12 +880,16 @@ private:
void enterScope(); void enterScope();
void leaveScope(); void leaveScope();
void wait();
void poll();
friend void _::detach(kj::Promise<void>&& promise); friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope); WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope); friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
friend class _::Event; friend class _::Event;
friend class WaitScope; friend class WaitScope;
friend class Executor;
}; };
class WaitScope { class WaitScope {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment