Commit 114e20cc authored by Kenton Varda's avatar Kenton Varda

Add tests for cross-thread-event cancellation, fix bug.

parent b715ecaf
...@@ -25,6 +25,15 @@ ...@@ -25,6 +25,15 @@
#include "mutex.h" #include "mutex.h"
#include <kj/compat/gtest.h> #include <kj/compat/gtest.h>
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#include <windows.h>
inline void delay() { Sleep(10); }
#else
#include <unistd.h>
inline void delay() { usleep(10000); }
#endif
namespace kj { namespace kj {
namespace { namespace {
...@@ -1084,5 +1093,105 @@ KJ_TEST("asynchonous promise cross-thread events") { ...@@ -1084,5 +1093,105 @@ KJ_TEST("asynchonous promise cross-thread events") {
} }
} }
KJ_TEST("cancel cross-thread event before it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
EventLoop loop;
WaitScope waitScope(loop);
*executor.lockExclusive() = getCurrentThreadExecutor();
// We never run the loop here, so that when the event is canceled, it's still queued.
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
EventLoop loop;
WaitScope waitScope(loop);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() { return 123u; });
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
KJ_TEST("cancel cross-thread event while it runs") {
MutexGuarded<kj::Maybe<const Executor&>> executor; // to get the Executor from the other thread
Own<PromiseFulfiller<void>> fulfiller; // accessed only from the subthread
Thread thread([&]() {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
})) {
// Log here because it's likely the parent thread will never join and we'll hang forever
// without propagating the exception.
KJ_LOG(ERROR, *exception);
kj::throwRecoverableException(kj::mv(*exception));
}
});
EventLoop loop;
WaitScope waitScope(loop);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
const Executor* exec;
{
auto lock = executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
{
Promise<uint> promise = exec->executeAsync([&]() -> kj::Promise<uint> {
return kj::NEVER_DONE;
});
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
exec->executeSync([&]() { fulfiller->fulfill(); });
*executor.lockExclusive() = nullptr;
})) {
// Log here because the thread join is likely to hang forever...
KJ_FAIL_EXPECT(*exception);
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -349,7 +349,6 @@ struct Executor::Impl { ...@@ -349,7 +349,6 @@ struct Executor::Impl {
event.disarm(); event.disarm();
event.state = _::XThreadEvent::DONE; event.state = _::XThreadEvent::DONE;
} }
event.armBreadthFirst();
}); });
replies.forEach([&](_::XThreadEvent& event) { replies.forEach([&](_::XThreadEvent& event) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment