Commit 2e3b3413 authored by Kenton Varda's avatar Kenton Varda

Simplify event queuing -- always preempt from same thread, yield cross-thread.

parent 860af726
......@@ -46,7 +46,7 @@ public:
while (size > 0) {
size_t n = rand() % size + 1;
inner.write(buffer, n);
usleep(5);
usleep(10000);
buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
}
......
......@@ -337,6 +337,38 @@ TEST(Async, Ordering) {
EXPECT_EQ(7, counter);
}
TEST(Async, Spark) {
// Tests that EventLoop::there() only runs eagerly when queued cross-thread.
SimpleEventLoop loop;
auto notification = newPromiseAndFulfiller<void>();;
Promise<void> unsparked = nullptr;
Promise<void> then = nullptr;
Promise<void> later = nullptr;
// `sparked` will evaluate eagerly, even though we never wait on it, because there() is being
// called from outside the event loop.
Promise<void> sparked = loop.there(Promise<void>(READY_NOW), [&]() {
// `unsparked` will never execute because it's attached to the current loop and we never wait
// on it.
unsparked = loop.there(Promise<void>(READY_NOW), [&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `then` will similarly never execute.
then = Promise<void>(READY_NOW).then([&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `evalLater` *does* eagerly execute even when queued to the same loop.
later = loop.evalLater([&]() {
notification.fulfiller->fulfill();
});
});
loop.wait(kj::mv(notification.promise));
}
TEST(Async, Fork) {
SimpleEventLoop loop;
......
......@@ -53,6 +53,20 @@ public:
}
};
class YieldPromiseNode final: public _::PromiseNode {
public:
bool onReady(EventLoop::Event& event) noexcept override {
event.arm(false);
return false;
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>().value = _::Void();
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return nullptr;
}
};
} // namespace
EventLoop& EventLoop::current() {
......@@ -61,6 +75,10 @@ EventLoop& EventLoop::current() {
return *result;
}
bool EventLoop::isCurrent() const {
return threadLocalEventLoop == this;
}
void EventLoop::EventListHead::fire() {
KJ_FAIL_ASSERT("Fired event list head.");
}
......@@ -114,6 +132,10 @@ void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result)
node->get(result);
}
Promise<void> EventLoop::yieldIfSameThread() const {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
EventLoop::Event::~Event() noexcept(false) {
if (this != &loop.queue) {
KJ_ASSERT(next == nullptr || std::uncaught_exception(),
......@@ -122,36 +144,32 @@ EventLoop::Event::~Event() noexcept(false) {
}
}
void EventLoop::Event::arm(Schedule schedule) {
void EventLoop::Event::arm(bool preemptIfSameThread) {
loop.queue.mutex.lock(_::Mutex::EXCLUSIVE);
KJ_DEFER(loop.queue.mutex.unlock(_::Mutex::EXCLUSIVE));
if (next == nullptr) {
bool queueIsEmpty = loop.queue.next == &loop.queue;
switch (schedule) {
case PREEMPT:
// Insert the event into the queue. We put it at the front rather than the back so that
// related events are executed together and so that increasing the granularity of events
// does not cause your code to "lose priority" compared to simultaneously-running code
// with less granularity.
next = loop.insertPoint;
prev = next->prev;
next->prev = this;
prev->next = this;
break;
case YIELD:
// Insert the node at the *end* of the queue.
prev = loop.queue.prev;
next = prev->next;
prev->next = this;
next->prev = this;
if (loop.insertPoint == &loop.queue) {
loop.insertPoint = this;
}
break;
if (preemptIfSameThread && threadLocalEventLoop == &loop) {
// Insert the event into the queue. We put it at the front rather than the back so that
// related events are executed together and so that increasing the granularity of events
// does not cause your code to "lose priority" compared to simultaneously-running code
// with less granularity.
next = loop.insertPoint;
prev = next->prev;
next->prev = this;
prev->next = this;
} else {
// Insert the node at the *end* of the queue.
prev = loop.queue.prev;
next = prev->next;
prev->next = this;
next->prev = this;
if (loop.insertPoint == &loop.queue) {
loop.insertPoint = this;
}
}
if (queueIsEmpty) {
......@@ -284,8 +302,7 @@ public:
Task(const Impl& taskSet, Own<_::PromiseNode>&& nodeParam)
: EventLoop::Event(taskSet.loop), taskSet(taskSet), node(kj::mv(nodeParam)) {
if (node->onReady(*this)) {
// TODO(soon): Only yield cross-thread.
arm(EventLoop::Event::YIELD);
arm();
}
}
......@@ -361,8 +378,7 @@ bool PromiseNode::atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Even
}
}
void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent,
EventLoop::Event::Schedule schedule) {
void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent) {
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
......@@ -370,7 +386,7 @@ void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent,
EventLoop::Event* oldEvent = nullptr;
if (!__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, _kJ_ALREADY_READY, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
oldEvent->arm(schedule);
oldEvent->arm();
}
}
......@@ -439,8 +455,7 @@ ForkBranchBase::~ForkBranchBase() noexcept(false) {
}
void ForkBranchBase::hubReady() noexcept {
// TODO(soon): This should only yield if queuing cross-thread.
atomicReady(onReadyEvent, EventLoop::Event::YIELD);
atomicReady(onReadyEvent);
}
void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
......@@ -467,9 +482,7 @@ ForkHubBase::ForkHubBase(const EventLoop& loop, Own<PromiseNode>&& inner,
ExceptionOrValue& resultRef)
: EventLoop::Event(loop), inner(kj::mv(inner)), resultRef(resultRef) {
KJ_DREQUIRE(this->inner->isSafeEventLoop(loop));
// TODO(soon): This should only yield if queuing cross-thread.
arm(YIELD);
arm();
}
ForkHubBase::~ForkHubBase() noexcept(false) {
......@@ -503,10 +516,10 @@ void ForkHubBase::fire() {
// -------------------------------------------------------------------
ChainPromiseNode::ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner, Schedule schedule)
ChainPromiseNode::ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner)
: Event(loop), state(PRE_STEP1), inner(kj::mv(inner)) {
KJ_DREQUIRE(this->inner->isSafeEventLoop(loop));
arm(schedule);
arm();
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {
......@@ -573,7 +586,7 @@ void ChainPromiseNode::fire() {
if (onReadyEvent != nullptr) {
if (inner->onReady(*onReadyEvent)) {
onReadyEvent->arm(PREEMPT);
onReadyEvent->arm();
}
}
}
......@@ -584,12 +597,7 @@ CrossThreadPromiseNodeBase::CrossThreadPromiseNodeBase(
const EventLoop& loop, Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef)
: Event(loop), dependency(kj::mv(dependency)), resultRef(resultRef) {
KJ_DREQUIRE(this->dependency->isSafeEventLoop(loop));
// The constructor may be called from any thread, so before we can even call onReady() we need
// to switch threads. We yield here so that the event is added to the end of the queue, which
// ensures that multiple events added in sequence are added in order. If we used PREEMPT, events
// we queue cross-thread would end up executing in a non-deterministic order.
arm(YIELD);
arm();
}
CrossThreadPromiseNodeBase::~CrossThreadPromiseNodeBase() noexcept(false) {
......@@ -616,7 +624,7 @@ void CrossThreadPromiseNodeBase::fire() {
}
// If onReadyEvent is null, set it to _kJ_ALREADY_READY. Otherwise, arm it.
PromiseNode::atomicReady(onReadyEvent, YIELD);
PromiseNode::atomicReady(onReadyEvent);
}
}
......
......@@ -228,6 +228,9 @@ public:
static EventLoop& current();
// Get the event loop for the current thread. Throws an exception if no event loop is active.
bool isCurrent() const;
// Is this EventLoop the current one for this thread?
template <typename T>
T wait(Promise<T>&& promise);
// Run the event loop until the promise is fulfilled, then return its result. If the promise
......@@ -299,22 +302,20 @@ public:
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
enum Schedule {
PREEMPT,
// The event gets added to the front of the queue, so that it runs immediately after the
// event that is currently running, even if other events were armed before that. If one
// event's firing arms multiple other events, those events still occur in the order in which
// they were armed. PREEMPT should generally only be used when arming an event to run in
// the current thread.
YIELD
// The event gets added to the end of the queue, so that it runs after all other events
// currently scheduled.
};
void arm(Schedule schedule);
void arm(bool preemptIfSameThread = true);
// Enqueue this event so that run() will be called from the event loop soon. Does nothing
// if the event is already armed.
//
// If called from the event loop's own thread (i.e. from within an event handler fired from
// this event loop), and `preemptIfSameThread` is true, the event will be scheduled
// "preemptively": it fires before any event that was already in the queue when the current
// event callback started. This ensures that chains of events that trigger each other occur
// quickly together, and reduces the unpredictability possible from other event callbacks
// running between them.
//
// If called from a different thread (or `preemptIfSameThread` is false), the event is placed
// at the end of the queue, to ensure that multiple events queued cross-thread fire in the
// order in which they were queued.
void disarm();
// Cancel this event if it is armed. If it is already running, block until it finishes
......@@ -378,13 +379,17 @@ private:
// callback. This keeps related events together.
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler,
Event::Schedule schedule) const;
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const;
// Shared implementation of there() and Promise::then().
void waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result);
// Run the event loop until `node` is fulfilled, and then `get()` its result into `result`.
Promise<void> yieldIfSameThread() const;
// If called from the event loop's thread, returns a promise that won't resolve until all events
// currently on the queue are fired. Otherwise, returns an already-resolved promise. Used to
// implement evalLater().
template <typename>
friend class Promise;
};
......@@ -577,19 +582,22 @@ public:
// as soon as possible, because the promise it returns might be for a newly-scheduled
// long-running asynchronous task.
//
// On the gripping hand, `EventLoop::there()` is _always_ proactive about evaluating `func`. This
// is because `there()` is commonly used to schedule a long-running computation on another thread.
// It is important that such a computation begin as soon as possible, even if no one is yet
// waiting for the result.
//
// As another optimization, when a callback function registered with `then()` is actually
// scheduled, it is scheduled to occur immediately, preempting other work in the event queue.
// This allows a long chain of `then`s to execute all at once, improving cache locality by
// clustering operations on the same data. However, this implies that starvation can occur
// if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
// actual I/O. To solve this, use `EventLoop::current()`'s `evalLater()` or `there()` methods
// to yield control; this way, all other events in the queue will get a chance to run before your
// callback is executed.
// actual I/O. To solve this, use `EventLoop::current()`'s `evalLater()` method to yield
// control; this way, all other events in the queue will get a chance to run before your callback
// is executed.
//
// `EventLoop::there()` behaves like `then()` when called on the current thread's EventLoop.
// However, when used to schedule work on some other thread's loop, `there()` does _not_ schedule
// preemptively as this would make the ordering of events unpredictable. Also, again only when
// scheduling cross-thread, `there()` will always evaluate the continuation eagerly even if
// nothing is waiting on the returned promise, on the assumption that it is being used to
// schedule a long-running computation in another thread. In other words, when scheduling
// cross-thread, both of the "optimizations" described above are avoided.
template <typename Func, typename ErrorFunc = _::PropagateException>
PromiseForResultNoChaining<Func, T> thenInAnyThread(
......@@ -897,7 +905,7 @@ protected:
// If onReadyEvent is _kJ_ALREADY_READY, return true.
// Useful for implementing onReady() thread-safely.
static void atomicReady(EventLoop::Event*& onReadyEvent, EventLoop::Event::Schedule schedule);
static void atomicReady(EventLoop::Event*& onReadyEvent);
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
......@@ -1110,7 +1118,7 @@ inline const ExceptionOrValue& ForkBranchBase::getHubResultRef() const {
class ChainPromiseNode final: public PromiseNode, private EventLoop::Event {
public:
ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner, Schedule schedule);
ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
bool onReady(EventLoop::Event& event) noexcept override;
......@@ -1136,20 +1144,18 @@ private:
};
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, const EventLoop& loop,
EventLoop::Event::Schedule schedule, Promise<T>*) {
return heap<ChainPromiseNode>(loop, kj::mv(node), schedule);
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, Promise<T>*) {
return heap<ChainPromiseNode>(loop, kj::mv(node));
}
template <typename T>
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, const EventLoop& loop,
EventLoop::Event::Schedule schedule, T*) {
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, T*) {
return kj::mv(node);
}
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) {
return heap<ChainPromiseNode>(EventLoop::current(), kj::mv(node), EventLoop::Event::PREEMPT);
return heap<ChainPromiseNode>(EventLoop::current(), kj::mv(node));
}
template <typename T>
......@@ -1224,7 +1230,7 @@ public:
protected:
inline void setReady() {
PromiseNode::atomicReady(onReadyEvent, EventLoop::Event::PREEMPT);
PromiseNode::atomicReady(onReadyEvent);
}
private:
......@@ -1297,29 +1303,34 @@ T EventLoop::wait(Promise<T>&& promise) {
template <typename Func>
auto EventLoop::evalLater(Func&& func) const -> PromiseForResult<Func, void> {
return there(Promise<void>(READY_NOW), kj::fwd<Func>(func));
// Invoke thereImpl() on yieldIfSameThread(). Always spark the result.
return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
thereImpl(yieldIfSameThread(), kj::fwd<Func>(func), _::PropagateException()),
*this));
}
template <typename T, typename Func, typename ErrorFunc>
PromiseForResult<Func, T> EventLoop::there(
Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const {
return PromiseForResult<Func, T>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, T>>>>(thereImpl(
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler), Event::YIELD),
*this));
Own<_::PromiseNode> node = thereImpl(
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
if (!isCurrent()) {
node = _::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, T>>>>(kj::mv(node), *this);
}
return PromiseForResult<Func, T>(false, kj::mv(node));
}
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> EventLoop::thereImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler,
Event::Schedule schedule) const {
ErrorFunc&& errorHandler) const {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
*this, _::makeSafeForLoop<_::FixVoid<T>>(kj::mv(promise.node), *this),
kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return _::maybeChain(kj::mv(intermediate), *this, schedule, implicitCast<ResultT*>(nullptr));
return _::maybeChain(kj::mv(intermediate), *this, implicitCast<ResultT*>(nullptr));
}
template <typename T>
......@@ -1334,8 +1345,7 @@ template <typename T>
template <typename Func, typename ErrorFunc>
PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) {
return PromiseForResult<Func, T>(false, EventLoop::current().thereImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
EventLoop::Event::PREEMPT));
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)));
}
template <typename T>
......@@ -1504,8 +1514,7 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller(const EventLoop& loop) {
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
Promise<_::JoinPromises<T>> promise(false,
_::maybeChain(kj::mv(intermediate), loop, EventLoop::Event::YIELD,
implicitCast<T*>(nullptr)));
_::maybeChain(kj::mv(intermediate), loop, implicitCast<T*>(nullptr)));
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment