Commit ff4d2996 authored by Kenton Varda's avatar Kenton Varda

Fix deadlock when two threads are waiting on each other for cross-thread event cancellation.

parent 55a6731a
...@@ -1151,5 +1151,71 @@ KJ_TEST("cancel cross-thread event while it runs") { ...@@ -1151,5 +1151,71 @@ KJ_TEST("cancel cross-thread event while it runs") {
})(); })();
} }
KJ_TEST("cross-thread cancellation in both directions at once") {
MutexGuarded<kj::Maybe<const Executor&>> childExecutor;
MutexGuarded<kj::Maybe<const Executor&>> parentExecutor;
MutexGuarded<uint> readyCount;
// Code to execute simultaneously in two threads...
// We mark this noexcept so that any exceptions thrown will immediately invoke the termination
// handler, skipping any destructors that would deadlock.
auto simultaneous = [&](MutexGuarded<kj::Maybe<const Executor&>>& selfExecutor,
MutexGuarded<kj::Maybe<const Executor&>>& otherExecutor) noexcept {
EventLoop loop;
WaitScope waitScope(loop);
*selfExecutor.lockExclusive() = getCurrentThreadExecutor();
const Executor* exec;
{
auto lock = otherExecutor.lockExclusive();
lock.wait([&](kj::Maybe<const Executor&> value) { return value != nullptr; });
exec = &KJ_ASSERT_NONNULL(*lock);
}
// Create a ton of cross-thread promises to cancel.
Vector<Promise<void>> promises;
for (uint i = 0; i < 1000; i++) {
promises.add(exec->executeAsync([&]() -> kj::Promise<void> {
return kj::NEVER_DONE;
}));
}
// Signal other thread that we're done queueing, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 2; });
}
// Run event loop to start all executions queued by the other thread.
waitScope.poll();
loop.run();
// Signal other thread that we've run the loop, and wait for it to signal same.
{
auto lock = readyCount.lockExclusive();
++*lock;
lock.wait([](uint i) { return i == 4; });
}
// Cancel all the promises.
promises.clear();
// Signal other that we're all done.
*otherExecutor.lockExclusive() = nullptr;
// Wait until other thread sets executor to null, as a way to tell us to quit.
selfExecutor.lockExclusive().wait([](auto& val) { return val == nullptr; });
};
Thread thread([&]() {
simultaneous(childExecutor, parentExecutor);
});
simultaneous(parentExecutor, childExecutor);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -19,12 +19,22 @@ ...@@ -19,12 +19,22 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE. // THE SOFTWARE.
#if _WIN32
#define WIN32_LEAN_AND_MEAN 1 // lolz
#endif
#include "async.h" #include "async.h"
#include "debug.h" #include "debug.h"
#include "vector.h" #include "vector.h"
#include "threadlocal.h" #include "threadlocal.h"
#include "mutex.h" #include "mutex.h"
#if _WIN32
#include <windows.h> // just for Sleep(0)
#else
#include <sched.h> // just for sched_yield()
#endif
#if KJ_USE_FUTEX #if KJ_USE_FUTEX
#include <unistd.h> #include <unistd.h>
#include <sys/syscall.h> #include <sys/syscall.h>
...@@ -328,6 +338,11 @@ struct Executor::Impl { ...@@ -328,6 +338,11 @@ struct Executor::Impl {
List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> cancel; List<&_::XThreadEvent::targetNext, &_::XThreadEvent::targetPrev> cancel;
List<&_::XThreadEvent::replyNext, &_::XThreadEvent::replyPrev> replies; List<&_::XThreadEvent::replyNext, &_::XThreadEvent::replyPrev> replies;
bool waitingForCancel = false;
// True if this thread is currently blocked waiting for some other thread to pump its
// cancellation queue. If that other thread tries to block on *this* thread, then it could
// deadlock -- it must take precautions against this.
bool empty() const { bool empty() const {
return run.empty() && cancel.empty() && replies.empty(); return run.empty() && cancel.empty() && replies.empty();
} }
...@@ -339,22 +354,36 @@ struct Executor::Impl { ...@@ -339,22 +354,36 @@ struct Executor::Impl {
event.armBreadthFirst(); event.armBreadthFirst();
}); });
dispatchCancels(nodesToDeleteOutsideLock);
replies.forEach([&](_::XThreadEvent& event) {
replies.erase(event);
event.onReadyEvent.armBreadthFirst();
});
}
void dispatchCancels(Vector<Own<_::PromiseNode>>& nodesToDeleteOutsideLock) {
cancel.forEach([&](_::XThreadEvent& event) { cancel.forEach([&](_::XThreadEvent& event) {
cancel.erase(event); cancel.erase(event);
if (event.state == _::XThreadEvent::EXECUTING) { if (event.state == _::XThreadEvent::EXECUTING) {
KJ_IF_MAYBE(n, event.promiseNode) { KJ_IF_MAYBE(n, event.promiseNode) {
// As a precaution, remove the onReady event pointer. This is probably not needed
// because it would be unusual for the destructor of a PromiseNode to try to access
// the onReady event pointer, but let's avoid having a dangling pointer in the first
// place.
n->get()->onReady(nullptr);
// Schedule to delete the node as soon as we drop the lock. It's important to drop the
// lock first because we have no idea what the destructor might do -- it's entirely
// possible the destructor will want to take the same lock.
nodesToDeleteOutsideLock.add(kj::mv(*n)); nodesToDeleteOutsideLock.add(kj::mv(*n));
event.promiseNode = nullptr; event.promiseNode = nullptr;
} }
event.disarm(); event.disarm();
event.state = _::XThreadEvent::DONE; event.state = _::XThreadEvent::DONE;
} }
}); });
replies.forEach([&](_::XThreadEvent& event) {
replies.erase(event);
event.onReadyEvent.armBreadthFirst();
});
} }
}; };
...@@ -370,6 +399,7 @@ void XThreadEvent::ensureDoneOrCanceled() { ...@@ -370,6 +399,7 @@ void XThreadEvent::ensureDoneOrCanceled() {
#else #else
if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) { if (__atomic_load_n(&state, __ATOMIC_ACQUIRE) != DONE) {
#endif #endif
Vector<Own<_::PromiseNode>> nodesToDeleteOutsideLock;
auto lock = targetExecutor.impl->state.lockExclusive(); auto lock = targetExecutor.impl->state.lockExclusive();
switch (state) { switch (state) {
case UNUSED: case UNUSED:
...@@ -377,13 +407,94 @@ void XThreadEvent::ensureDoneOrCanceled() { ...@@ -377,13 +407,94 @@ void XThreadEvent::ensureDoneOrCanceled() {
break; break;
case QUEUED: case QUEUED:
lock->run.erase(*this); lock->run.erase(*this);
// No wake needed since we removed work rather than adding it.
state = DONE; state = DONE;
break; break;
case EXECUTING: case EXECUTING: {
lock->cancel.insert(*this); lock->cancel.insert(*this);
lock.wait([&](auto&) { return state == DONE; }); Maybe<Executor&> maybeSelfExecutor = nullptr;
if (threadLocalEventLoop != nullptr) {
KJ_IF_MAYBE(e, threadLocalEventLoop->executor) {
maybeSelfExecutor = *e;
}
}
KJ_IF_MAYBE(selfExecutor, maybeSelfExecutor) {
// If, while waiting for other threads to process our cancellation request, we have
// cancellation requests queued back to this thread, we must process them. Otherwise,
// we could deadlock with two threads waiting on each other to process cancellations.
//
// We don't have a terribly good way to detect this, except to check if the remote
// thread is itself waiting for cancellations and, if so, wake ourselves up to check for
// cancellations to process. This will busy-loop but at least it should eventually
// resolve assuming fair scheduling.
//
// To make things extra-annoying, in order to update our waitingForCancel flag, we have
// to lock our own executor state, but we can't take both locks at once, so we have to
// release the other lock in the meantime.
// Make sure we unset waitingForCancel on the way out.
KJ_DEFER({
lock = {};
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = false;
selfLock->dispatchCancels(nodesToDeleteOutsideLock);
// We don't need to re-take the lock on the other executor here; it's not used again
// after this scope.
});
while (state != DONE) {
bool otherThreadIsWaiting = lock->waitingForCancel;
// Make sure our waitingForCancel is on and dispatch any pending cancellations on this
// thread.
lock = {};
{
auto selfLock = selfExecutor->impl->state.lockExclusive();
selfLock->waitingForCancel = true;
// Note that we don't have to proactively delete the PromiseNodes extracted from
// the canceled events because those nodes belong to this thread and can't possibly
// continue executing while we're blocked here.
selfLock->dispatchCancels(nodesToDeleteOutsideLock);
}
if (otherThreadIsWaiting) {
// We know the other thread was waiting for cancellations to complete a moment ago.
// We may have just processed the necessary cancellations in this thread, in which
// case the other thread needs a chance to receive control and notice this. Or, it
// may be that the other thread is waiting for some third thread to take action.
// Either way, we should yield control here to give things a chance to settle.
// Otherwise we could end up in a tight busy loop.
#if _WIN32
Sleep(0);
#else
sched_yield();
#endif
}
// OK now we can take the original lock again.
lock = targetExecutor.impl->state.lockExclusive();
// OK, now we can wait for the other thread to either process our cancellation or
// indicate that it is waiting for remote cancellation.
lock.wait([&](const Executor::Impl::State& executorState) {
return state == DONE || executorState.waitingForCancel;
});
}
} else {
// We have no executor of our own so we don't have to worry about cancellation cycles
// causing deadlock.
//
// NOTE: I don't think we can actually get here, because it implies that this is a
// synchronous execution, which means there's no way to cancel it.
lock.wait([&](auto&) { return state == DONE; });
}
KJ_DASSERT(targetPrev == nullptr); KJ_DASSERT(targetPrev == nullptr);
break; break;
}
case DONE: case DONE:
// Became done while we waited for lock. Nothing to do. // Became done while we waited for lock. Nothing to do.
break; break;
......
...@@ -890,6 +890,7 @@ private: ...@@ -890,6 +890,7 @@ private:
friend class _::Event; friend class _::Event;
friend class WaitScope; friend class WaitScope;
friend class Executor; friend class Executor;
friend class _::XThreadEvent;
}; };
class WaitScope { class WaitScope {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment