Commit 5d175beb authored by Kenton Varda's avatar Kenton Varda

Move cross-thread async tests into a separate file.

This commit does not change any of the code.

A subsequent commit will take advantage of this to apply the exact same tests when an EventPort is in use.
parent ff4d2996
...@@ -460,6 +460,7 @@ else !LITE_MODE ...@@ -460,6 +460,7 @@ else !LITE_MODE
check_PROGRAMS = capnp-test capnp-evolution-test capnp-afl-testcase check_PROGRAMS = capnp-test capnp-evolution-test capnp-afl-testcase
heavy_tests = \ heavy_tests = \
src/kj/async-test.c++ \ src/kj/async-test.c++ \
src/kj/async-xthread-test.c++ \
src/kj/async-unix-test.c++ \ src/kj/async-unix-test.c++ \
src/kj/async-win32-test.c++ \ src/kj/async-win32-test.c++ \
src/kj/async-io-test.c++ \ src/kj/async-io-test.c++ \
......
...@@ -215,6 +215,7 @@ if(BUILD_TESTING) ...@@ -215,6 +215,7 @@ if(BUILD_TESTING)
if(NOT CAPNP_LITE) if(NOT CAPNP_LITE)
add_executable(kj-heavy-tests add_executable(kj-heavy-tests
async-test.c++ async-test.c++
async-xthread-test.c++
async-unix-test.c++ async-unix-test.c++
async-win32-test.c++ async-win32-test.c++
async-io-test.c++ async-io-test.c++
......
...@@ -21,19 +21,8 @@ ...@@ -21,19 +21,8 @@
#include "async.h" #include "async.h"
#include "debug.h" #include "debug.h"
#include "thread.h"
#include "mutex.h"
#include <kj/compat/gtest.h> #include <kj/compat/gtest.h>
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#include <windows.h>
inline void delay() { Sleep(10); }
#else
#include <unistd.h>
inline void delay() { usleep(10000); }
#endif
namespace kj { namespace kj {
namespace { namespace {
...@@ -837,385 +826,5 @@ KJ_TEST("exclusiveJoin both events complete simultaneously") { ...@@ -837,385 +826,5 @@ KJ_TEST("exclusiveJoin both events complete simultaneously") {
KJ_EXPECT(!joined.poll(waitScope)); KJ_EXPECT(!joined.poll(waitScope));
} }
KJ_TEST("synchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}).wait(waitScope));
Promise<uint> promise = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456u;
});
KJ_EXPECT(promise.wait(waitScope) == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("synchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(i == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}).wait(waitScope));
Promise<uint> promise2 = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(promise2.wait(waitScope) == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event before it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
*executor.lockExclusive() = getCurrentThreadExecutor();
// We never run the loop here, so that when the event is canceled, it's still queued.
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() { return 123u; });
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event while it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> {
return kj::NEVER_DONE;
});
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
exec->executeSync([&]() { fulfiller->fulfill(); });
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cross-thread cancellation in both directions at once") {
MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
MutexGuarded<uint> readyCount;
// Code to execute simultaneously in two threads...
// We mark this noexcept so that any exceptions thrown will immediately invoke the termination
// handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor) noexcept {
EventLoop loop;
WaitScope waitScope(loop);
*selfExecutor.lockExclusive() = getCurrentThreadExecutor();
const Executor* exec;
{
auto lock = otherExecutor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
// Create a ton of cross-thread promises to cancel.
Vector<Promise<void>> promises;
for (uint i = 0; i < 1000; i++) {
promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
return kj::NEVER_DONE;
}));
}
// Signal other thread that we're done queueing, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 2; });
}
// Run event loop to start all executions queued by the other thread.
waitScope.poll();
loop.run();
// Signal other thread that we've run the loop, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 4; });
}
// Cancel all the promises.
promises.clear();
// Signal other that we're all done.
*otherExecutor.lockExclusive() = nullptr;
// Wait until other thread sets executor to null, as a way to tell us to quit.
selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
Thread thread([&]() {
simultaneous(childExecutor, parentExecutor);
});
simultaneous(parentExecutor, childExecutor);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "async.h"
#include "debug.h"
#include "thread.h"
#include "mutex.h"
#include <kj/test.h>
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#include <windows.h>
inline void delay() { Sleep(10); }
#else
#include <unistd.h>
inline void delay() { usleep(10000); }
#endif
namespace kj {
namespace {
KJ_TEST("synchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous simple cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}).wait(waitScope));
Promise<uint> promise = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456u;
});
KJ_EXPECT(promise.wait(waitScope) == 456);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("synchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(i == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("asynchonous promise cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
auto paf2 = newPromiseAndFulfiller<uint>();
promise = kj::mv(paf2.promise);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
paf2.fulfiller->fulfill(321);
// Make sure reply gets sent.
loop.run();
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeAsync([&]() {
KJ_ASSERT(isChild);
return kj::Promise<void>(KJ_EXCEPTION(FAILED, "test exception"));
}).wait(waitScope));
Promise<uint> promise2 = exec->executeAsync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return kj::mv(promise);
});
KJ_EXPECT(promise2.wait(waitScope) == 321);
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event before it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
*executor.lockExclusive() = getCurrentThreadExecutor();
// We never run the loop here, so that when the event is canceled, it's still queued.
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() { return 123u; });
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cancel cross-thread event while it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread
// We use `noexcept` so that any uncaught exceptions immediately terminate the process without
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> {
return kj::NEVER_DONE;
});
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
exec->executeSync([&]() { fulfiller->fulfill(); });
*executor.lockExclusive() = nullptr;
})();
}
KJ_TEST("cross-thread cancellation in both directions at once") {
MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
MutexGuarded<uint> readyCount;
thread_local bool isChild = false;
// Code to execute simultaneously in two threads...
// We mark this noexcept so that any exceptions thrown will immediately invoke the termination
// handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor) noexcept {
EventLoop loop;
WaitScope waitScope(loop);
*selfExecutor.lockExclusive() = getCurrentThreadExecutor();
const Executor* exec;
{
auto lock = otherExecutor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
// Create a ton of cross-thread promises to cancel.
Vector<Promise<void>> promises;
for (uint i = 0; i < 1000; i++) {
promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
return kj::Promise<void>(kj::NEVER_DONE)
.attach(kj::defer([wasChild = isChild]() {
// Make sure destruction happens in the correct thread.
KJ_ASSERT(isChild == wasChild);
}));
}));
}
// Signal other thread that we're done queueing, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 2; });
}
// Run event loop to start all executions queued by the other thread.
waitScope.poll();
loop.run();
// Signal other thread that we've run the loop, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 4; });
}
// Cancel all the promises.
promises.clear();
// All our cancellations completed, but the other thread may still be waiting for some
// cancellations from us. We need to pump our event loop to make sure we continue handling
// those cancellation requests. In particular we'll queue a function to the other thread and
// wait for it to complete. The other thread will queue its own function to this thread just
// before completing the function we queued to it.
exec->executeAsync([]() {}).wait(waitScope);
// To be safe, make sure we've actually executed the function that the other thread queued to
// us by running the loop one last time.
loop.run();
// OK, signal other that we're all done.
*otherExecutor.lockExclusive() = nullptr;
// Wait until other thread sets executor to null, as a way to tell us to quit.
selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
Thread thread([&]() {
isChild = true;
simultaneous(childExecutor, parentExecutor);
});
simultaneous(parentExecutor, childExecutor);
}
} // namespace
} // namespace kj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment