Commit f8142fc7 authored by Kenton Varda's avatar Kenton Varda

Add kj::evalLast() for running a callback after all other events are done.

parent 21cee3c4
......@@ -136,6 +136,10 @@ public:
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
void armLast();
// Enqueues this event to happen after all other events have run to completion and there is
// really nothing left to do except wait for I/O.
void disarm();
// If the event is armed but hasn't fired, cancel it. (Destroying the event does this
// implicitly.)
......@@ -973,6 +977,11 @@ inline PromiseForResult<Func, void> evalLater(Func&& func) {
return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
}
template <typename Func>
inline PromiseForResult<Func, void> evalLast(Func&& func) {
return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
}
template <typename Func>
inline PromiseForResult<Func, void> evalNow(Func&& func) {
PromiseForResult<Func, void> result = nullptr;
......
......@@ -220,6 +220,7 @@ void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope);
Promise<void> yield();
Promise<void> yieldHarder();
Own<PromiseNode> neverDone();
class NeverDone {
......
......@@ -380,7 +380,9 @@ TEST(Async, Ordering) {
WaitScope waitScope(loop);
int counter = 0;
Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr};
Promise<void> promises[10] = {
nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr
};
promises[1] = evalLater([&]() {
EXPECT_EQ(0, counter++);
......@@ -401,12 +403,24 @@ TEST(Async, Ordering) {
EXPECT_EQ(4, counter++);
}).then([&]() {
EXPECT_EQ(5, counter++);
promises[8] = kj::evalLast([&]() {
EXPECT_EQ(9, counter++);
promises[9] = kj::evalLater([&]() {
EXPECT_EQ(10, counter++);
});
});
}).eagerlyEvaluate(nullptr);
{
auto paf = kj::newPromiseAndFulfiller<void>();
promises[4] = paf.promise.then([&]() {
EXPECT_EQ(2, counter++);
promises[6] = kj::evalLast([&]() {
EXPECT_EQ(7, counter++);
promises[7] = kj::evalLater([&]() {
EXPECT_EQ(8, counter++);
});
});
}).eagerlyEvaluate(nullptr);
paf.fulfiller->fulfill();
}
......@@ -429,7 +443,7 @@ TEST(Async, Ordering) {
kj::mv(promises[i]).wait(waitScope);
}
EXPECT_EQ(7, counter);
EXPECT_EQ(11, counter);
}
TEST(Async, Fork) {
......
......@@ -78,6 +78,16 @@ public:
}
};
class YieldHarderPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
if (event) event->armLast();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
};
class NeverDonePromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
......@@ -750,6 +760,9 @@ bool EventLoop::turn() {
}
depthFirstInsertPoint = &head;
if (breadthFirstInsertPoint == &event->next) {
breadthFirstInsertPoint = &head;
}
if (tail == &event->next) {
tail = &head;
}
......@@ -921,6 +934,10 @@ Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
Promise<void> yieldHarder() {
return Promise<void>(false, kj::heap<YieldHarderPromiseNode>());
}
Own<PromiseNode> neverDone() {
return kj::heap<NeverDonePromiseNode>();
}
......@@ -964,6 +981,9 @@ void Event::armDepthFirst() {
loop.depthFirstInsertPoint = &next;
if (loop.breadthFirstInsertPoint == prev) {
loop.breadthFirstInsertPoint = &next;
}
if (loop.tail == prev) {
loop.tail = &next;
}
......@@ -978,14 +998,42 @@ void Event::armBreadthFirst() {
"Executor to queue events cross-thread.");
if (prev == nullptr) {
next = *loop.tail;
prev = loop.tail;
next = *loop.breadthFirstInsertPoint;
prev = loop.breadthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
loop.breadthFirstInsertPoint = &next;
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true);
}
}
void Event::armLast() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"Executor to queue events cross-thread.");
if (prev == nullptr) {
next = *loop.breadthFirstInsertPoint;
prev = loop.breadthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
loop.tail = &next;
// We don't update loop.breadthFirstInsertPoint because we want further inserts to go *before*
// this event.
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true);
}
......@@ -1005,6 +1053,9 @@ void Event::disarm() {
if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev;
}
if (loop.breadthFirstInsertPoint == &next) {
loop.breadthFirstInsertPoint = prev;
}
*prev = next;
if (next != nullptr) {
......
......@@ -318,6 +318,7 @@ private:
friend class _::ForkHub;
friend class TaskSet;
friend Promise<void> _::yield();
friend Promise<void> _::yieldHarder();
friend class _::NeverDone;
template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
......@@ -379,6 +380,25 @@ PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT;
// If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the
// main reason why `evalNow()` is useful.
template <typename Func>
PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT;
// Like `evalLater()`, except that the function doesn't run until the event queue is otherwise
// completely empty and the thread is about to suspend waiting for I/O.
//
// This is useful when you need to perform some disruptive action and you want to make sure that
// you don't interrupt some other task between two .then() continuations. For example, say you want
// to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw
// them. It could be that a read() has completed and bytes have been transferred to the target
// buffer, but the .then() callback that handles the read result hasn't executed yet. If you
// cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do
// evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out
// and if you didn't receive the read result yet, then you know nothing has been read, and you can
// simply drop the promise.
//
// If evalLast() is called multiple times, functions are executed in LIFO order. If the first
// callback enqueues new events, then latter callbacks will not execute until those events are
// drained.
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
// Join an array of promises into a promise for an array.
......@@ -866,6 +886,7 @@ private:
_::Event* head = nullptr;
_::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head;
_::Event** breadthFirstInsertPoint = &head;
kj::Maybe<Executor> executor;
// Allocated the first time getExecutor() is requested, making cross-thread request possible.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment