Commit 776eee73 authored by Kenton Varda's avatar Kenton Varda

Apply all xthread tests to EventPort implementations, fix bugs.

I totally forgot to add wake()s in multiple spots.
parent 5d175beb
......@@ -462,7 +462,9 @@ heavy_tests = \
src/kj/async-test.c++ \
src/kj/async-xthread-test.c++ \
src/kj/async-unix-test.c++ \
src/kj/async-unix-xthread-test.c++ \
src/kj/async-win32-test.c++ \
src/kj/async-win32-xthread-test.c++ \
src/kj/async-io-test.c++ \
src/kj/parse/common-test.c++ \
src/kj/parse/char-test.c++ \
......
......@@ -217,7 +217,9 @@ if(BUILD_TESTING)
async-test.c++
async-xthread-test.c++
async-unix-test.c++
async-unix-xthread-test.c++
async-win32-test.c++
async-win32-xthread-test.c++
async-io-test.c++
refcount-test.c++
string-tree-test.c++
......
......@@ -775,67 +775,6 @@ TEST(AsyncUnixTest, ChildProcess) {
// child3 will be killed and synchronously waited on the way out.
}
KJ_TEST("UnixEventPort cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace
} // namespace kj
......
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if !_WIN32
#include "async-unix.h"
#define KJ_XTHREAD_TEST_SETUP_LOOP \
UnixEventPort port; \
EventLoop loop(port); \
WaitScope waitScope(loop)
#include "async-xthread-test.c++"
#endif // !_WIN32
......@@ -163,66 +163,6 @@ KJ_TEST("Win32IocpEventPort APC") {
paf.promise.wait(waitScope);
}
KJ_TEST("Win32IocpEventPort cross-thread events") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<uint>> fulfiller; // accessed only from the subthread
Promise<uint> promise = nullptr; // accessed only from the subthread
thread_local bool isChild = false; // to assert which thread we're in
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
isChild = true;
Win32IocpEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
KJ_ASSERT(paf.promise.wait(waitScope) == 123);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
KJ_ASSERT(!isChild);
KJ_EXPECT_THROW_MESSAGE("test exception", exec->executeSync([&]() {
KJ_ASSERT(isChild);
KJ_FAIL_ASSERT("test exception") { break; }
}));
uint i = exec->executeSync([&]() {
KJ_ASSERT(isChild);
fulfiller->fulfill(123);
return 456;
});
KJ_EXPECT(i == 456);
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace
} // namespace kj
......
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#include "async-win32.h"
#define KJ_XTHREAD_TEST_SETUP_LOOP \
Win32IocpEventPort port; \
EventLoop loop(port); \
WaitScope waitScope(loop)
#include "async-xthread-test.c++"
#endif // _WIN32
......@@ -34,6 +34,14 @@ inline void delay() { Sleep(10); }
inline void delay() { usleep(10000); }
#endif
// This file is #included from async-unix-xthread-test.c++ and async-win32-xthread-test.c++ after
// defining KJ_XTHREAD_TEST_SETUP_LOOP to set up a loop with the corresponding EventPort.
#ifndef KJ_XTHREAD_TEST_SETUP_LOOP
#define KJ_XTHREAD_TEST_SETUP_LOOP \
EventLoop loop; \
WaitScope waitScope(loop)
#endif
namespace kj {
namespace {
......@@ -48,8 +56,7 @@ KJ_TEST("synchonous simple cross-thread events") {
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
......@@ -99,8 +106,7 @@ KJ_TEST("asynchonous simple cross-thread events") {
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
......@@ -114,8 +120,7 @@ KJ_TEST("asynchonous simple cross-thread events") {
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
......@@ -154,8 +159,7 @@ KJ_TEST("synchonous promise cross-thread events") {
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
......@@ -214,8 +218,7 @@ KJ_TEST("asynchonous promise cross-thread events") {
Thread thread([&]() noexcept {
isChild = true;
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<uint>();
fulfiller = kj::mv(paf.fulfiller);
......@@ -237,8 +240,7 @@ KJ_TEST("asynchonous promise cross-thread events") {
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
......@@ -272,8 +274,7 @@ KJ_TEST("cancel cross-thread event before it runs") {
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
*executor.lockExclusive() = getCurrentThreadExecutor();
......@@ -284,8 +285,7 @@ KJ_TEST("cancel cross-thread event before it runs") {
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
......@@ -312,8 +312,7 @@ KJ_TEST("cancel cross-thread event while it runs") {
// unwinding. Otherwise, the unwind would likely deadlock waiting for some synchronization with
// the other thread.
Thread thread([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
......@@ -327,8 +326,7 @@ KJ_TEST("cancel cross-thread event while it runs") {
});
([&]() noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
const Executor* exec;
{
......@@ -364,8 +362,7 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
// handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor) noexcept {
EventLoop loop;
WaitScope waitScope(loop);
KJ_XTHREAD_TEST_SETUP_LOOP;
*selfExecutor.lockExclusive() = getCurrentThreadExecutor();
......
......@@ -412,6 +412,10 @@ void XThreadEvent::ensureDoneOrCanceled() {
break;
case EXECUTING: {
lock->cancel.insert(*this);
KJ_IF_MAYBE(p, targetExecutor.loop.port) {
p->wake();
}
Maybe<Executor&> maybeSelfExecutor = nullptr;
if (threadLocalEventLoop != nullptr) {
KJ_IF_MAYBE(e, threadLocalEventLoop->executor) {
......@@ -515,8 +519,14 @@ void XThreadEvent::ensureDoneOrCanceled() {
void XThreadEvent::done() {
KJ_IF_MAYBE(e, replyExecutor) {
// Queue the reply.
auto lock = e->impl->state.lockExclusive();
lock->replies.insert(*this);
{
auto lock = e->impl->state.lockExclusive();
lock->replies.insert(*this);
}
KJ_IF_MAYBE(p, e->loop.port) {
p->wake();
}
}
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment