Commit 9fa3eadc authored by Kenton Varda's avatar Kenton Varda

Improve and simplify event ordering. When explicitly calling an EventLoop,…

Improve and simplify event ordering.  When explicitly calling an EventLoop, events are now always added to the end of the queue, ensuring deterministic run order and making yield() obsolete.  OTOH, then() may queue events to run immediately, preempting other events.
parent 7325ed03
......@@ -247,7 +247,7 @@ TEST(Async, Threads) {
EXPECT_ANY_THROW(EventLoop::current());
}
TEST(Async, Yield) {
TEST(Async, Ordering) {
SimpleEventLoop loop1;
SimpleEventLoop loop2;
......@@ -256,25 +256,31 @@ TEST(Async, Yield) {
promises[1] = loop2.evalLater([&]() {
EXPECT_EQ(0, counter++);
promises[2] = loop2.evalLater([&]() {
promises[2] = Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(1, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
});
promises[3] = loop2.there(loop2.yield(), [&]() {
promises[3] = loop2.evalLater([&]() {
EXPECT_EQ(4, counter++);
return loop2.evalLater([&]() {
return Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(5, counter++);
});
});
promises[4] = loop2.evalLater([&]() {
promises[4] = Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(2, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
});
promises[5] = loop2.there(loop2.yield(), [&]() {
promises[5] = loop2.evalLater([&]() {
EXPECT_EQ(6, counter++);
});
});
promises[0] = loop2.evalLater([&]() {
EXPECT_EQ(3, counter++);
// Making this a chain should NOT cause it to preempt promises[1]. (This was a problem at one
// point.)
return Promise<void>(READY_NOW);
});
auto exitThread = newPromiseAndFulfiller<void>();
......
......@@ -43,40 +43,6 @@ thread_local EventLoop* threadLocalEventLoop = nullptr;
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::EventLoop::Event*>(1)
class YieldPromiseNode final: public _::PromiseNode, public EventLoop::Event {
// A PromiseNode used to implement EventLoop::yield().
public:
YieldPromiseNode(const EventLoop& loop): Event(loop) {}
~YieldPromiseNode() {
disarm();
}
bool onReady(EventLoop::Event& event) noexcept override {
if (onReadyEvent == _kJ_ALREADY_READY) {
return true;
} else {
onReadyEvent = &event;
return false;
}
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return getEventLoop();
}
void fire() override {
if (onReadyEvent != nullptr) {
onReadyEvent->arm();
}
}
private:
EventLoop::Event* onReadyEvent = nullptr;
};
class BoolEvent: public EventLoop::Event {
public:
BoolEvent(const EventLoop& loop): Event(loop) {}
......@@ -150,49 +116,45 @@ void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result)
node->get(result);
}
Promise<void> EventLoop::yield() {
auto node = heap<YieldPromiseNode>(*this);
// Insert the node at the *end* of the queue.
queue.mutex.lock(_::Mutex::EXCLUSIVE);
node->prev = queue.prev;
node->next = &queue;
queue.prev->next = node;
queue.prev = node;
if (insertPoint == &queue) {
insertPoint = node;
}
if (node->prev == &queue) {
// Queue was empty previously. Make sure to wake it up if it is sleeping.
wake();
}
queue.mutex.unlock(_::Mutex::EXCLUSIVE);
return Promise<void>(kj::mv(node));
}
EventLoop::Event::~Event() noexcept(false) {
if (this != &loop.queue) {
KJ_ASSERT(next == nullptr || std::uncaught_exception(), "Event destroyed while armed.");
}
}
void EventLoop::Event::arm() {
void EventLoop::Event::arm(Schedule schedule) {
loop.queue.mutex.lock(_::Mutex::EXCLUSIVE);
KJ_DEFER(loop.queue.mutex.unlock(_::Mutex::EXCLUSIVE));
if (next == nullptr) {
// Insert the event into the queue. We put it at the front rather than the back so that related
// events are executed together and so that increasing the granularity of events does not cause
// your code to "lose priority" compared to simultaneously-running code with less granularity.
bool queueIsEmpty = loop.queue.next == &loop.queue;
switch (schedule) {
case PREEMPT:
// Insert the event into the queue. We put it at the front rather than the back so that
// related events are executed together and so that increasing the granularity of events
// does not cause your code to "lose priority" compared to simultaneously-running code
// with less granularity.
next = loop.insertPoint;
prev = next->prev;
next->prev = this;
prev->next = this;
break;
case YIELD:
// Insert the node at the *end* of the queue.
prev = loop.queue.prev;
next = prev->next;
prev->next = this;
next->prev = this;
if (next == &loop.queue) {
if (loop.insertPoint == &loop.queue) {
loop.insertPoint = this;
}
break;
}
if (queueIsEmpty) {
// Queue was empty previously. Make sure to wake it up if it is sleeping.
loop.wake();
}
......@@ -203,6 +165,10 @@ void EventLoop::Event::disarm() {
if (next != nullptr) {
loop.queue.mutex.lock(_::Mutex::EXCLUSIVE);
if (loop.insertPoint == this) {
loop.insertPoint = next;
}
next->prev = prev;
prev->next = next;
next = nullptr;
......@@ -317,7 +283,8 @@ bool PromiseNode::atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Even
}
}
void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent) {
void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent,
EventLoop::Event::Schedule schedule) {
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
......@@ -325,7 +292,7 @@ void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent) {
EventLoop::Event* oldEvent = nullptr;
if (!__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, _kJ_ALREADY_READY, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
oldEvent->arm();
oldEvent->arm(schedule);
}
}
......@@ -359,10 +326,10 @@ Maybe<const EventLoop&> TransformPromiseNodeBase::getSafeEventLoop() noexcept {
return loop;
}
ChainPromiseNode::ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner)
ChainPromiseNode::ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner, Schedule schedule)
: Event(loop), state(PRE_STEP1), inner(kj::mv(inner)) {
KJ_DREQUIRE(this->inner->isSafeEventLoop(loop));
arm();
arm(schedule);
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {
......@@ -429,7 +396,7 @@ void ChainPromiseNode::fire() {
if (onReadyEvent != nullptr) {
if (inner->onReady(*onReadyEvent)) {
onReadyEvent->arm();
onReadyEvent->arm(PREEMPT);
}
}
}
......@@ -440,8 +407,10 @@ CrossThreadPromiseNodeBase::CrossThreadPromiseNodeBase(
KJ_DREQUIRE(this->dependent->isSafeEventLoop(loop));
// The constructor may be called from any thread, so before we can even call onReady() we need
// to switch threads.
arm();
// to switch threads. We yield here so that the event is added to the end of the queue, which
// ensures that multiple events added in sequence are added in order. If we used PREEMPT, events
// we queue cross-thread would end up executing in a non-deterministic order.
arm(YIELD);
}
CrossThreadPromiseNodeBase::~CrossThreadPromiseNodeBase() noexcept(false) {
......@@ -468,7 +437,7 @@ void CrossThreadPromiseNodeBase::fire() {
}
// If onReadyEvent is null, set it to _kJ_ALREADY_READY. Otherwise, arm it.
PromiseNode::atomicReady(onReadyEvent);
PromiseNode::atomicReady(onReadyEvent, YIELD);
}
}
......
......@@ -219,18 +219,6 @@ public:
// non-fatal exception and then recovers by returning a dummy value, wait() will also throw a
// non-fatal exception and return the same dummy value.
Promise<void> yield();
// Returns a promise which is fulfilled when all work currently in the queue has completed.
// Note that this doesn't necessarily mean the queue is empty at that point -- if you call
// `yield()` twice, the promise from the first call will be fulfilled before the one returned
// by the second call.
//
// Note that `yield()` is the only way to add events to the _end_ of the queue. When a promise
// is fulfilled and some other promise is waiting on it, the `then` callback for that promise
// actually goes onto the _beginning_ of the queue, so that related callbacks occur together and
// splitting a task into finer-grained callbacks does not cause the task to "lose priority"
// compared to other tasks occurring concurrently.
template <typename Func>
auto evalLater(Func&& func) const -> PromiseForResult<Func, void>;
// Schedule for the given zero-parameter function to be executed in the event loop at some
......@@ -244,8 +232,9 @@ public:
// If the returned promise is destroyed while the callback is running in another thread, the
// destructor will block until the callback completes.
//
// `evalLater()` is largely equivalent to `there()` called on an already-fulfilled
// `Promise<Void>`.
// If you schedule several evaluations with `evalLater`, they will be executed in order.
//
// `evalLater()` is equivalent to `there()` chained on `Promise<void>(READY_NOW)`.
template <typename T, typename Func, typename ErrorFunc = _::PropagateException>
auto there(Promise<T>&& promise, Func&& func,
......@@ -277,8 +266,22 @@ public:
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void arm();
// Enqueue this event so that run() will be called from the event loop soon.
enum Schedule {
PREEMPT,
// The event gets added to the front of the queue, so that it runs immediately after the
// event that is currently running, even if other events were armed before that. If one
// event's firing arms multiple other events, those events still occur in the order in which
// they were armed. PREEMPT should generally only be used when arming an event to run in
// the current thread.
YIELD
// The event gets added to the end of the queue, so that it runs after all other events
// currently scheduled.
};
void arm(Schedule schedule);
// Enqueue this event so that run() will be called from the event loop soon. Does nothing
// if the event is already armed.
void disarm();
// Cancel this event if it is armed. If it is already running, block until it finishes
......@@ -331,16 +334,19 @@ private:
void fire() override; // throws
};
EventListHead queue;
mutable EventListHead queue;
// Head of the event list. queue.mutex protects all next/prev pointers across the list, as well
// as `insertPoint`. Each actual event's mutex protects its own `fire()` callback.
Event* insertPoint;
mutable Event* insertPoint;
// The next event after the one that is currently firing. New events are inserted just before
// this event. When the fire callback completes, the loop continues at the beginning of the
// queue -- thus, it starts by running any events that were just enqueued by the previous
// callback. This keeps related events together.
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const;
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler,
Event::Schedule schedule) const;
// Shared implementation of there() and Promise::then().
void waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result);
......@@ -484,6 +490,9 @@ public:
// Note that `then()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid, but `then()` returns a new promise.
//
// *Advanced implementation tips:* Most users will never need to worry about the below, but
// it is good to be aware of.
//
// As an optimization, if the callback function `func` does _not_ return another promise, then
// execution of `func` itself may be delayed until its result is known to be needed. The
// here expectation is that `func` is just doing some transformation on the results, not
......@@ -500,7 +509,14 @@ public:
// It is important that such a computation begin as soon as possible, even if no one is yet
// waiting for the result.
//
// In most cases, none of the above makes a difference and you need not worry about it.
// As another optimization, when a callback function registered with `then()` is actually
// scheduled, it is scheduled to occur immediately, preempting other work in the event queue.
// This allows a long chain of `then`s to execute all at once, improving cache locality by
// clustering operations on the same data. However, this implies that starvation can occur
// if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
// actual I/O. To solve this, use `EventLoop::current()`'s `evalLater()` or `there()` methods
// to yield control; this way, all other events in the queue will get a chance to run before your
// callback is executed.
T wait();
// Equivalent to `EventLoop::current().wait(kj::mv(*this))`. WARNING: Although `wait()`
......@@ -555,6 +571,9 @@ public:
template <>
class PromiseFulfiller<void> {
// Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>.
//
// It is safe to call a PromiseFulfiller from any thread, as long as you only call it from one
// thread at a time.
public:
virtual void fulfill(_::Void&& value = _::Void()) = 0;
......@@ -683,7 +702,7 @@ protected:
// If onReadyEvent is _kJ_ALREADY_READY, return true.
// Useful for implementing onReady() thread-safely.
static void atomicReady(EventLoop::Event*& onReadyEvent);
static void atomicReady(EventLoop::Event*& onReadyEvent, EventLoop::Event::Schedule schedule);
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
......@@ -774,7 +793,7 @@ private:
class ChainPromiseNode final: public PromiseNode, private EventLoop::Event {
public:
ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner);
ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner, Schedule schedule);
~ChainPromiseNode() noexcept(false);
bool onReady(EventLoop::Event& event) noexcept override;
......@@ -800,12 +819,14 @@ private:
};
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, Promise<T>*) {
return heap<ChainPromiseNode>(loop, kj::mv(node));
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, const EventLoop& loop,
EventLoop::Event::Schedule schedule, Promise<T>*) {
return heap<ChainPromiseNode>(loop, kj::mv(node), schedule);
}
template <typename T>
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, T*) {
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, const EventLoop& loop,
EventLoop::Event::Schedule schedule, T*) {
return kj::mv(node);
}
......@@ -872,7 +893,7 @@ public:
protected:
inline void setReady() {
PromiseNode::atomicReady(onReadyEvent);
PromiseNode::atomicReady(onReadyEvent, EventLoop::Event::PREEMPT);
}
private:
......@@ -946,19 +967,20 @@ template <typename T, typename Func, typename ErrorFunc>
auto EventLoop::there(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const
-> PromiseForResult<Func, T> {
return _::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, T>>>>(thereImpl(
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)), *this);
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler), Event::YIELD), *this);
}
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> EventLoop::thereImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler) const {
ErrorFunc&& errorHandler,
Event::Schedule schedule) const {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
*this, _::makeSafeForLoop<_::FixVoid<T>>(kj::mv(promise.node), *this),
kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return _::maybeChain(kj::mv(intermediate), *this, implicitCast<ResultT*>(nullptr));
return _::maybeChain(kj::mv(intermediate), *this, schedule, implicitCast<ResultT*>(nullptr));
}
template <typename T>
......@@ -969,7 +991,8 @@ template <typename T>
template <typename Func, typename ErrorFunc>
auto Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) -> PromiseForResult<Func, T> {
return EventLoop::current().thereImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler),
EventLoop::Event::PREEMPT);
}
template <typename T>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment