Commit b71d3cc2 authored by Kenton Varda's avatar Kenton Varda

Test xthread cycles better, make cancellation fully synchronous.

* Extend the existing bidirectional cancellation thread to a three-thread case.
* Also create a three-thread case where the events actually depend on each other in a cycle.

The second new test made me realize a bigger problem: Cancellation really needs to destroy the PromiseNode in the remote thread synchronously before acknowledging, because the destructors of that PromiseNode could access objects captured from the requesting thread. So, I went ahead and fixed that.
parent dd1ecd8c
...@@ -354,15 +354,17 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -354,15 +354,17 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
MutexGuarded<kj::Maybe<const Executor&>> childExecutor; MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
MutexGuarded<kj::Maybe<const Executor&>> parentExecutor; MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
MutexGuarded<uint> readyCount; MutexGuarded<uint> readyCount(0);
thread_local bool isChild = false; thread_local uint threadNumber = 0;
thread_local bool receivedFinalCall = false;
// Code to execute simultaneously in two threads... // Code to execute simultaneously in two threads...
// We mark this noexcept so that any exceptions thrown will immediately invoke the termination // We mark this noexcept so that any exceptions thrown will immediately invoke the termination
// handler, skipping any destructors that would deadlock. // handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor, auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor) noexcept { MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor,
uint threadCount) noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP; KJ_XTHREAD_TEST_SETUP_LOOP;
*selfExecutor.lockExclusive() = getCurrentThreadExecutor(); *selfExecutor.lockExclusive() = getCurrentThreadExecutor();
...@@ -379,9 +381,9 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -379,9 +381,9 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
for (uint i = 0; i < 1000; i++) { for (uint i = 0; i < 1000; i++) {
promises.add(exec->executeAsync([&]() -> kj::Promise<void> { promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
return kj::Promise<void>(kj::NEVER_DONE) return kj::Promise<void>(kj::NEVER_DONE)
.attach(kj::defer([wasChild = isChild]() { .attach(kj::defer([wasThreadNumber = threadNumber]() {
// Make sure destruction happens in the correct thread. // Make sure destruction happens in the correct thread.
KJ_ASSERT(isChild == wasChild); KJ_ASSERT(threadNumber == wasThreadNumber);
})); }));
})); }));
} }
...@@ -390,7 +392,7 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -390,7 +392,7 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
{ {
auto lock = readyCount.lockExclusive(); auto lock = readyCount.lockExclusive();
++*lock; ++*lock;
lock.wait([](uint i) { return i == 2; }); lock.wait([&](uint i) { return i == threadCount; });
} }
// Run event loop to start all executions queued by the other thread. // Run event loop to start all executions queued by the other thread.
...@@ -401,7 +403,7 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -401,7 +403,7 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
{ {
auto lock = readyCount.lockExclusive(); auto lock = readyCount.lockExclusive();
++*lock; ++*lock;
lock.wait([](uint i) { return i == 4; }); lock.wait([&](uint i) { return i == threadCount * 2; });
} }
// Cancel all the promises. // Cancel all the promises.
...@@ -412,11 +414,15 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -412,11 +414,15 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
// those cancellation requests. In particular we'll queue a function to the other thread and // those cancellation requests. In particular we'll queue a function to the other thread and
// wait for it to complete. The other thread will queue its own function to this thread just // wait for it to complete. The other thread will queue its own function to this thread just
// before completing the function we queued to it. // before completing the function we queued to it.
exec->executeAsync([]() {}).wait(waitScope); receivedFinalCall = false;
exec->executeAsync([&]() { receivedFinalCall = true; }).wait(waitScope);
// To be safe, make sure we've actually executed the function that the other thread queued to // To be safe, make sure we've actually executed the function that the other thread queued to
// us by running the loop one last time. // us by repeatedly polling until `receivedFinalCall` becomes true in this thread.
loop.run(); while (!receivedFinalCall) {
waitScope.poll();
loop.run();
}
// OK, signal other that we're all done. // OK, signal other that we're all done.
*otherExecutor.lockExclusive() = nullptr; *otherExecutor.lockExclusive() = nullptr;
...@@ -425,12 +431,103 @@ KJ_TEST("cross-thread cancellation in both directions at once") { ...@@ -425,12 +431,103 @@ KJ_TEST("cross-thread cancellation in both directions at once") {
selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; }); selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
}; };
Thread thread([&]() { {
isChild = true; Thread thread([&]() {
simultaneous(childExecutor, parentExecutor); threadNumber = 1;
}); simultaneous(childExecutor, parentExecutor, 2);
});
threadNumber = 0;
simultaneous(parentExecutor, childExecutor, 2);
}
// Let's even have a three-thread version, with cyclic cancellation requests.
MutexGuarded<kj::Maybe<const Executor&>> child2Executor;
*readyCount.lockExclusive() = 0;
{
Thread thread1([&]() {
threadNumber = 1;
simultaneous(childExecutor, child2Executor, 3);
});
Thread thread2([&]() {
threadNumber = 2;
simultaneous(child2Executor, parentExecutor, 3);
});
threadNumber = 0;
simultaneous(parentExecutor, childExecutor, 3);
}
}
KJ_TEST("cross-thread cancellation cycle") {
// Another multi-way cancellation test where we set up an actual cycle between three threads
// waiting on each other to complete a single event.
MutexGuarded<kj::Maybe<const Executor&>> child1Executor, child2Executor;
Own<PromiseFulfiller<void>> fulfiller1, fulfiller2;
auto threadMain = [](MutexGuarded<kj::Maybe<const Executor&>>& executor,
Own<PromiseFulfiller<void>>& fulfiller) noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto paf = newPromiseAndFulfiller<void>();
fulfiller = kj::mv(paf.fulfiller);
*executor.lockExclusive() = getCurrentThreadExecutor();
simultaneous(parentExecutor, childExecutor); paf.promise.wait(waitScope);
// Wait until parent thread sets executor to null, as a way to tell us to quit.
executor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
Thread thread1([&]() noexcept { threadMain(child1Executor, fulfiller1); });
Thread thread2([&]() noexcept { threadMain(child2Executor, fulfiller2); });
([&]() noexcept {
KJ_XTHREAD_TEST_SETUP_LOOP;
auto& parentExecutor = getCurrentThreadExecutor();
const Executor* exec1;
{
auto lock = child1Executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec1 = &KJ_ASSERT_NONNULL(*lock);
}
const Executor* exec2;
{
auto lock = child2Executor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec2 = &KJ_ASSERT_NONNULL(*lock);
}
// Create an event that cycles through both threads and back to this one, and then cancel it.
bool cycleAllDestroyed = false;
{
Promise<uint> promise = exec1->executeAsync([&]() -> kj::Promise<uint> {
return exec2->executeAsync([&]() -> kj::Promise<uint> {
return parentExecutor.executeAsync([&]() -> kj::Promise<uint> {
return kj::Promise<uint>(kj::NEVER_DONE).attach(kj::defer([&]() {
cycleAllDestroyed = true;
}));
});
});
});
delay();
KJ_EXPECT(!promise.poll(waitScope));
}
KJ_EXPECT(cycleAllDestroyed);
exec1->executeSync([&]() { fulfiller1->fulfill(); });
exec2->executeSync([&]() { fulfiller2->fulfill(); });
*child1Executor.lockExclusive() = nullptr;
*child2Executor.lockExclusive() = nullptr;
})();
} }
KJ_TEST("call own thread's executor") { KJ_TEST("call own thread's executor") {
......
...@@ -348,14 +348,14 @@ struct Executor::Impl { ...@@ -348,14 +348,14 @@ struct Executor::Impl {
return run.empty() && cancel.empty() && replies.empty(); return run.empty() && cancel.empty() && replies.empty();
} }
void dispatchAll(Vector<Own<_::PromiseNode>>& nodesToDeleteOutsideLock) { void dispatchAll(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
run.forEach([&](_::XThreadEvent& event) { run.forEach([&](_::XThreadEvent& event) {
run.erase(event); run.erase(event);
event.state = _::XThreadEvent::EXECUTING; event.state = _::XThreadEvent::EXECUTING;
event.armBreadthFirst(); event.armBreadthFirst();
}); });
dispatchCancels(nodesToDeleteOutsideLock); dispatchCancels(eventsToCancelOutsideLock);
replies.forEach([&](_::XThreadEvent& event) { replies.forEach([&](_::XThreadEvent& event) {
replies.erase(event); replies.erase(event);
...@@ -363,25 +363,16 @@ struct Executor::Impl { ...@@ -363,25 +363,16 @@ struct Executor::Impl {
}); });
} }
void dispatchCancels(Vector<Own<_::PromiseNode>>& nodesToDeleteOutsideLock) { void dispatchCancels(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
cancel.forEach([&](_::XThreadEvent& event) { cancel.forEach([&](_::XThreadEvent& event) {
cancel.erase(event); cancel.erase(event);
if (event.state == _::XThreadEvent::EXECUTING) {
KJ_IF_MAYBE(n, event.promiseNode) { KJ_IF_MAYBE(n, event.promiseNode) {
// As a precaution, remove the onReady event pointer. This is probably not needed // We can't destroy the promiseNode while the mutex is locked, because we don't know
// because it would be unusual for the destructor of a PromiseNode to try to access // what the destructor might do. But, we *must* destroy it before acknowledging
// the onReady event pointer, but let's avoid having a dangling pointer in the first // cancellation. So we have to add it to a list to destroy later.
// place. eventsToCancelOutsideLock.add(&event);
n->get()->onReady(nullptr); } else {
// Schedule to delete the node as soon as we drop the lock. It's important to drop the
// lock first because we have no idea what the destructor might do -- it's entirely
// possible the destructor will want to take the same lock.
nodesToDeleteOutsideLock.add(kj::mv(*n));
event.promiseNode = nullptr;
}
event.disarm();
event.state = _::XThreadEvent::DONE; event.state = _::XThreadEvent::DONE;
} }
}); });
...@@ -390,6 +381,25 @@ struct Executor::Impl { ...@@ -390,6 +381,25 @@ struct Executor::Impl {
kj::MutexGuarded<State> state; kj::MutexGuarded<State> state;
// After modifying state from another thread, the loop's port.wake() must be called. // After modifying state from another thread, the loop's port.wake() must be called.
void processAsyncCancellations(Vector<_::XThreadEvent*>& eventsToCancelOutsideLock) {
// After calling dispatchAll() or dispatchCancels() with the lock held, it may be that some
// cancellations require dropping the lock before destroying the promiseNode. In that case
// those cancellations will be added to the eventsToCancelOutsideLock Vector passed to the
// method. That vector must then be passed to processAsyncCancellations() as soon as the lock
// is released.
for (auto& event: eventsToCancelOutsideLock) {
event->promiseNode = nullptr;
event->disarm();
}
// Now we need to mark all the events "done" under lock.
auto lock = state.lockExclusive();
for (auto& event: eventsToCancelOutsideLock) {
event->state = _::XThreadEvent::DONE;
}
}
}; };
namespace _ { // (private) namespace _ { // (private)
...@@ -400,7 +410,6 @@ void XThreadEvent::ensureDoneOrCanceled() { ...@@ -400,7 +410,6 @@ void XThreadEvent::ensureDoneOrCanceled() {
#else #else
if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) { if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
#endif #endif
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock;
auto lock = targetExecutor.impl->state.lockExclusive(); auto lock = targetExecutor.impl->state.lockExclusive();
switch (state) { switch (state) {
case UNUSED: case UNUSED:
...@@ -442,9 +451,12 @@ void XThreadEvent::ensureDoneOrCanceled() { ...@@ -442,9 +451,12 @@ void XThreadEvent::ensureDoneOrCanceled() {
KJ_DEFER({ KJ_DEFER({
lock = {}; lock = {};
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive(); auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = false; selfLock->waitingForCancel = false;
selfLock->dispatchCancels(nodesToDeleteOutsideLock); selfLock->dispatchCancels(eventsToCancelOutsideLock);
// We don't need to re-take the lock on the other executor here; it's not used again // We don't need to re-take the lock on the other executor here; it's not used again
// after this scope. // after this scope.
...@@ -457,13 +469,16 @@ void XThreadEvent::ensureDoneOrCanceled() { ...@@ -457,13 +469,16 @@ void XThreadEvent::ensureDoneOrCanceled() {
// thread. // thread.
lock = {}; lock = {};
{ {
Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(selfExecutor->impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto selfLock = selfExecutor->impl->state.lockExclusive(); auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = true; selfLock->waitingForCancel = true;
// Note that we don't have to proactively delete the PromiseNodes extracted from // Note that we don't have to proactively delete the PromiseNodes extracted from
// the canceled events because those nodes belong to this thread and can't possibly // the canceled events because those nodes belong to this thread and can't possibly
// continue executing while we're blocked here. // continue executing while we're blocked here.
selfLock->dispatchCancels(nodesToDeleteOutsideLock); selfLock->dispatchCancels(eventsToCancelOutsideLock);
} }
if (otherThreadIsWaiting) { if (otherThreadIsWaiting) {
...@@ -576,6 +591,7 @@ Maybe<Own<Event>> XThreadEvent::fire() { ...@@ -576,6 +591,7 @@ Maybe<Own<Event>> XThreadEvent::fire() {
KJ_IF_MAYBE(n, promiseNode) { KJ_IF_MAYBE(n, promiseNode) {
n->get()->get(result); n->get()->get(result);
promiseNode = nullptr; // make sure to destroy in the thread that created it
return Own<Event>(this, DISPOSER); return Own<Event>(this, DISPOSER);
} else { } else {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
...@@ -642,7 +658,8 @@ void Executor::send(_::XThreadEvent& event, bool sync) const { ...@@ -642,7 +658,8 @@ void Executor::send(_::XThreadEvent& event, bool sync) const {
} }
void Executor::wait() { void Executor::wait() {
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock; Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive(); auto lock = impl->state.lockExclusive();
...@@ -650,17 +667,18 @@ void Executor::wait() { ...@@ -650,17 +667,18 @@ void Executor::wait() {
return !state.empty(); return !state.empty();
}); });
lock->dispatchAll(nodesToDeleteOutsideLock); lock->dispatchAll(eventsToCancelOutsideLock);
} }
bool Executor::poll() { bool Executor::poll() {
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock; Vector<_::XThreadEvent*> eventsToCancelOutsideLock;
KJ_DEFER(impl->processAsyncCancellations(eventsToCancelOutsideLock));
auto lock = impl->state.lockExclusive(); auto lock = impl->state.lockExclusive();
if (lock->empty()) { if (lock->empty()) {
return false; return false;
} else { } else {
lock->dispatchAll(nodesToDeleteOutsideLock); lock->dispatchAll(eventsToCancelOutsideLock);
return true; return true;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment