Commit 3fa44ffe authored by Kenton Varda's avatar Kenton Varda

Events queued together should occur in logical order.

parent 7d1bae3c
...@@ -254,26 +254,29 @@ TEST(Async, Yield) { ...@@ -254,26 +254,29 @@ TEST(Async, Yield) {
int counter = 0; int counter = 0;
Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr}; Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr};
promises[0] = loop2.evalLater([&]() {
EXPECT_EQ(3, counter++);
});
promises[1] = loop2.evalLater([&]() { promises[1] = loop2.evalLater([&]() {
EXPECT_EQ(0, counter++); EXPECT_EQ(0, counter++);
promises[2] = loop2.evalLater([&]() { promises[2] = loop2.evalLater([&]() {
EXPECT_EQ(2, counter++); EXPECT_EQ(1, counter++);
}); });
promises[3] = loop2.there(loop2.yield(), [&]() { promises[3] = loop2.there(loop2.yield(), [&]() {
EXPECT_EQ(4, counter++); EXPECT_EQ(4, counter++);
return loop2.evalLater([&]() {
EXPECT_EQ(5, counter++);
});
}); });
promises[4] = loop2.evalLater([&]() { promises[4] = loop2.evalLater([&]() {
EXPECT_EQ(1, counter++); EXPECT_EQ(2, counter++);
}); });
promises[5] = loop2.there(loop2.yield(), [&]() { promises[5] = loop2.there(loop2.yield(), [&]() {
EXPECT_EQ(5, counter++); EXPECT_EQ(6, counter++);
}); });
}); });
promises[0] = loop2.evalLater([&]() {
EXPECT_EQ(3, counter++);
});
auto exitThread = newPromiseAndFulfiller<void>(); auto exitThread = newPromiseAndFulfiller<void>();
Thread thread([&]() { Thread thread([&]() {
...@@ -289,7 +292,7 @@ TEST(Async, Yield) { ...@@ -289,7 +292,7 @@ TEST(Async, Yield) {
loop1.wait(kj::mv(promises[i])); loop1.wait(kj::mv(promises[i]));
} }
EXPECT_EQ(6, counter); EXPECT_EQ(7, counter);
} }
} // namespace } // namespace
......
...@@ -101,7 +101,7 @@ void EventLoop::EventListHead::fire() { ...@@ -101,7 +101,7 @@ void EventLoop::EventListHead::fire() {
KJ_FAIL_ASSERT("Fired event list head."); KJ_FAIL_ASSERT("Fired event list head.");
} }
EventLoop::EventLoop(): queue(*this) { EventLoop::EventLoop(): queue(*this), insertPoint(&queue) {
queue.next = &queue; queue.next = &queue;
queue.prev = &queue; queue.prev = &queue;
} }
...@@ -133,6 +133,9 @@ void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result) ...@@ -133,6 +133,9 @@ void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result)
event->next = nullptr; event->next = nullptr;
event->prev = nullptr; event->prev = nullptr;
// New events should be inserted at the beginning of the queue, but in order.
insertPoint = queue.next;
// Lock it before we unlock the queue mutex. // Lock it before we unlock the queue mutex.
event->mutex.lock(_::Mutex::EXCLUSIVE); event->mutex.lock(_::Mutex::EXCLUSIVE);
...@@ -157,6 +160,10 @@ Promise<void> EventLoop::yield() { ...@@ -157,6 +160,10 @@ Promise<void> EventLoop::yield() {
queue.prev->next = node; queue.prev->next = node;
queue.prev = node; queue.prev = node;
if (insertPoint == &queue) {
insertPoint = node;
}
if (node->prev == &queue) { if (node->prev == &queue) {
// Queue was empty previously. Make sure to wake it up if it is sleeping. // Queue was empty previously. Make sure to wake it up if it is sleeping.
wake(); wake();
...@@ -180,7 +187,7 @@ void EventLoop::Event::arm() { ...@@ -180,7 +187,7 @@ void EventLoop::Event::arm() {
// Insert the event into the queue. We put it at the front rather than the back so that related // Insert the event into the queue. We put it at the front rather than the back so that related
// events are executed together and so that increasing the granularity of events does not cause // events are executed together and so that increasing the granularity of events does not cause
// your code to "lose priority" compared to simultaneously-running code with less granularity. // your code to "lose priority" compared to simultaneously-running code with less granularity.
next = loop.queue.next; next = loop.insertPoint;
prev = next->prev; prev = next->prev;
next->prev = this; next->prev = this;
prev->next = this; prev->next = this;
......
...@@ -333,6 +333,12 @@ private: ...@@ -333,6 +333,12 @@ private:
EventListHead queue; EventListHead queue;
Event* insertPoint;
// The next event after the one that is currently firing. New events are inserted just before
// this event. When the fire callback completes, the loop continues at the beginning of the
// queue -- thus, it starts by running any events that were just enqueued by the previous
// callback. This keeps related events together.
template <typename T, typename Func, typename ErrorFunc> template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const; Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const;
// Shared implementation of there() and Promise::then(). // Shared implementation of there() and Promise::then().
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment