Unverified Commit 4f24d8fc authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #871 from capnproto/eval-last

Add `kj::evalLast()` for running a callback after all other events are done.
parents 21cee3c4 05c139d1
...@@ -2895,9 +2895,9 @@ private: ...@@ -2895,9 +2895,9 @@ private:
EmbargoId embargoId = context.getSenderLoopback(); EmbargoId embargoId = context.getSenderLoopback();
// We need to insert an evalLater() here to make sure that any pending calls towards this // We need to insert an evalLast() here to make sure that any pending calls towards this
// cap have had time to find their way through the event loop. // cap have had time to find their way through the event loop.
tasks.add(kj::evalLater(kj::mvCapture( tasks.add(kj::evalLast(kj::mvCapture(
target, [this,embargoId](kj::Own<ClientHook>&& target) { target, [this,embargoId](kj::Own<ClientHook>&& target) {
if (!connection.is<Connected>()) { if (!connection.is<Connected>()) {
return; return;
......
...@@ -136,6 +136,10 @@ public: ...@@ -136,6 +136,10 @@ public:
void armBreadthFirst(); void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue. // Like `armDepthFirst()` except that the event is placed at the end of the queue.
void armLast();
// Enqueues this event to happen after all other events have run to completion and there is
// really nothing left to do except wait for I/O.
void disarm(); void disarm();
// If the event is armed but hasn't fired, cancel it. (Destroying the event does this // If the event is armed but hasn't fired, cancel it. (Destroying the event does this
// implicitly.) // implicitly.)
...@@ -973,6 +977,11 @@ inline PromiseForResult<Func, void> evalLater(Func&& func) { ...@@ -973,6 +977,11 @@ inline PromiseForResult<Func, void> evalLater(Func&& func) {
return _::yield().then(kj::fwd<Func>(func), _::PropagateException()); return _::yield().then(kj::fwd<Func>(func), _::PropagateException());
} }
template <typename Func>
inline PromiseForResult<Func, void> evalLast(Func&& func) {
return _::yieldHarder().then(kj::fwd<Func>(func), _::PropagateException());
}
template <typename Func> template <typename Func>
inline PromiseForResult<Func, void> evalNow(Func&& func) { inline PromiseForResult<Func, void> evalNow(Func&& func) {
PromiseForResult<Func, void> result = nullptr; PromiseForResult<Func, void> result = nullptr;
......
...@@ -220,6 +220,7 @@ void detach(kj::Promise<void>&& promise); ...@@ -220,6 +220,7 @@ void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope); void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope); bool pollImpl(_::PromiseNode& node, WaitScope& waitScope);
Promise<void> yield(); Promise<void> yield();
Promise<void> yieldHarder();
Own<PromiseNode> neverDone(); Own<PromiseNode> neverDone();
class NeverDone { class NeverDone {
......
...@@ -379,57 +379,75 @@ TEST(Async, Ordering) { ...@@ -379,57 +379,75 @@ TEST(Async, Ordering) {
EventLoop loop; EventLoop loop;
WaitScope waitScope(loop); WaitScope waitScope(loop);
class ErrorHandlerImpl: public TaskSet::ErrorHandler {
public:
void taskFailed(kj::Exception&& exception) override {
KJ_FAIL_EXPECT(exception);
}
};
int counter = 0; int counter = 0;
Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr}; ErrorHandlerImpl errorHandler;
kj::TaskSet tasks(errorHandler);
promises[1] = evalLater([&]() { tasks.add(evalLater([&]() {
EXPECT_EQ(0, counter++); EXPECT_EQ(0, counter++);
{ {
// Use a promise and fulfiller so that we can fulfill the promise after waiting on it in // Use a promise and fulfiller so that we can fulfill the promise after waiting on it in
// order to induce depth-first scheduling. // order to induce depth-first scheduling.
auto paf = kj::newPromiseAndFulfiller<void>(); auto paf = kj::newPromiseAndFulfiller<void>();
promises[2] = paf.promise.then([&]() { tasks.add(paf.promise.then([&]() {
EXPECT_EQ(1, counter++); EXPECT_EQ(1, counter++);
}).eagerlyEvaluate(nullptr); }));
paf.fulfiller->fulfill(); paf.fulfiller->fulfill();
} }
// .then() is scheduled breadth-first if the promise has already resolved, but depth-first // .then() is scheduled breadth-first if the promise has already resolved, but depth-first
// if the promise resolves later. // if the promise resolves later.
promises[3] = Promise<void>(READY_NOW).then([&]() { tasks.add(Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(4, counter++); EXPECT_EQ(4, counter++);
}).then([&]() { }).then([&]() {
EXPECT_EQ(5, counter++); EXPECT_EQ(5, counter++);
}).eagerlyEvaluate(nullptr); tasks.add(kj::evalLast([&]() {
EXPECT_EQ(7, counter++);
tasks.add(kj::evalLater([&]() {
EXPECT_EQ(8, counter++);
}));
}));
}));
{ {
auto paf = kj::newPromiseAndFulfiller<void>(); auto paf = kj::newPromiseAndFulfiller<void>();
promises[4] = paf.promise.then([&]() { tasks.add(paf.promise.then([&]() {
EXPECT_EQ(2, counter++); EXPECT_EQ(2, counter++);
}).eagerlyEvaluate(nullptr); tasks.add(kj::evalLast([&]() {
EXPECT_EQ(9, counter++);
tasks.add(kj::evalLater([&]() {
EXPECT_EQ(10, counter++);
}));
}));
}));
paf.fulfiller->fulfill(); paf.fulfiller->fulfill();
} }
// evalLater() is like READY_NOW.then(). // evalLater() is like READY_NOW.then().
promises[5] = evalLater([&]() { tasks.add(evalLater([&]() {
EXPECT_EQ(6, counter++); EXPECT_EQ(6, counter++);
}).eagerlyEvaluate(nullptr); }));
}).eagerlyEvaluate(nullptr); }));
promises[0] = evalLater([&]() { tasks.add(evalLater([&]() {
EXPECT_EQ(3, counter++); EXPECT_EQ(3, counter++);
// Making this a chain should NOT cause it to preempt promises[1]. (This was a problem at one // Making this a chain should NOT cause it to preempt the first promise. (This was a problem
// point.) // at one point.)
return Promise<void>(READY_NOW); return Promise<void>(READY_NOW);
}).eagerlyEvaluate(nullptr); }));
for (auto i: indices(promises)) { tasks.onEmpty().wait(waitScope);
kj::mv(promises[i]).wait(waitScope);
}
EXPECT_EQ(7, counter); EXPECT_EQ(11, counter);
} }
TEST(Async, Fork) { TEST(Async, Fork) {
......
...@@ -78,6 +78,16 @@ public: ...@@ -78,6 +78,16 @@ public:
} }
}; };
class YieldHarderPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event* event) noexcept override {
if (event) event->armLast();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
};
class NeverDonePromiseNode final: public _::PromiseNode { class NeverDonePromiseNode final: public _::PromiseNode {
public: public:
void onReady(_::Event* event) noexcept override { void onReady(_::Event* event) noexcept override {
...@@ -750,6 +760,9 @@ bool EventLoop::turn() { ...@@ -750,6 +760,9 @@ bool EventLoop::turn() {
} }
depthFirstInsertPoint = &head; depthFirstInsertPoint = &head;
if (breadthFirstInsertPoint == &event->next) {
breadthFirstInsertPoint = &head;
}
if (tail == &event->next) { if (tail == &event->next) {
tail = &head; tail = &head;
} }
...@@ -921,6 +934,10 @@ Promise<void> yield() { ...@@ -921,6 +934,10 @@ Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>()); return Promise<void>(false, kj::heap<YieldPromiseNode>());
} }
Promise<void> yieldHarder() {
return Promise<void>(false, kj::heap<YieldHarderPromiseNode>());
}
Own<PromiseNode> neverDone() { Own<PromiseNode> neverDone() {
return kj::heap<NeverDonePromiseNode>(); return kj::heap<NeverDonePromiseNode>();
} }
...@@ -964,6 +981,9 @@ void Event::armDepthFirst() { ...@@ -964,6 +981,9 @@ void Event::armDepthFirst() {
loop.depthFirstInsertPoint = &next; loop.depthFirstInsertPoint = &next;
if (loop.breadthFirstInsertPoint == prev) {
loop.breadthFirstInsertPoint = &next;
}
if (loop.tail == prev) { if (loop.tail == prev) {
loop.tail = &next; loop.tail = &next;
} }
...@@ -978,14 +998,42 @@ void Event::armBreadthFirst() { ...@@ -978,14 +998,42 @@ void Event::armBreadthFirst() {
"Executor to queue events cross-thread."); "Executor to queue events cross-thread.");
if (prev == nullptr) { if (prev == nullptr) {
next = *loop.tail; next = *loop.breadthFirstInsertPoint;
prev = loop.tail; prev = loop.breadthFirstInsertPoint;
*prev = this; *prev = this;
if (next != nullptr) { if (next != nullptr) {
next->prev = &next; next->prev = &next;
} }
loop.breadthFirstInsertPoint = &next;
if (loop.tail == prev) {
loop.tail = &next; loop.tail = &next;
}
loop.setRunnable(true);
}
}
void Event::armLast() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use "
"Executor to queue events cross-thread.");
if (prev == nullptr) {
next = *loop.breadthFirstInsertPoint;
prev = loop.breadthFirstInsertPoint;
*prev = this;
if (next != nullptr) {
next->prev = &next;
}
// We don't update loop.breadthFirstInsertPoint because we want further inserts to go *before*
// this event.
if (loop.tail == prev) {
loop.tail = &next;
}
loop.setRunnable(true); loop.setRunnable(true);
} }
...@@ -1005,6 +1053,9 @@ void Event::disarm() { ...@@ -1005,6 +1053,9 @@ void Event::disarm() {
if (loop.depthFirstInsertPoint == &next) { if (loop.depthFirstInsertPoint == &next) {
loop.depthFirstInsertPoint = prev; loop.depthFirstInsertPoint = prev;
} }
if (loop.breadthFirstInsertPoint == &next) {
loop.breadthFirstInsertPoint = prev;
}
*prev = next; *prev = next;
if (next != nullptr) { if (next != nullptr) {
......
...@@ -318,6 +318,7 @@ private: ...@@ -318,6 +318,7 @@ private:
friend class _::ForkHub; friend class _::ForkHub;
friend class TaskSet; friend class TaskSet;
friend Promise<void> _::yield(); friend Promise<void> _::yield();
friend Promise<void> _::yieldHarder();
friend class _::NeverDone; friend class _::NeverDone;
template <typename U> template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises); friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
...@@ -379,6 +380,25 @@ PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT; ...@@ -379,6 +380,25 @@ PromiseForResult<Func, void> evalNow(Func&& func) KJ_WARN_UNUSED_RESULT;
// If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the // If `func()` throws an exception, the exception is caught and wrapped in a promise -- this is the
// main reason why `evalNow()` is useful. // main reason why `evalNow()` is useful.
template <typename Func>
PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT;
// Like `evalLater()`, except that the function doesn't run until the event queue is otherwise
// completely empty and the thread is about to suspend waiting for I/O.
//
// This is useful when you need to perform some disruptive action and you want to make sure that
// you don't interrupt some other task between two .then() continuations. For example, say you want
// to cancel a read() operation on a socket and know for sure that if any bytes were read, you saw
// them. It could be that a read() has completed and bytes have been transferred to the target
// buffer, but the .then() callback that handles the read result hasn't executed yet. If you
// cancel the promise at this inopportune moment, the bytes in the buffer are lost. If you do
// evalLast(), then you can be sure that any pending .then() callbacks had a chance to finish out
// and if you didn't receive the read result yet, then you know nothing has been read, and you can
// simply drop the promise.
//
// If evalLast() is called multiple times, functions are executed in LIFO order. If the first
// callback enqueues new events, then latter callbacks will not execute until those events are
// drained.
template <typename T> template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises); Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
// Join an array of promises into a promise for an array. // Join an array of promises into a promise for an array.
...@@ -866,6 +886,7 @@ private: ...@@ -866,6 +886,7 @@ private:
_::Event* head = nullptr; _::Event* head = nullptr;
_::Event** tail = &head; _::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head; _::Event** depthFirstInsertPoint = &head;
_::Event** breadthFirstInsertPoint = &head;
kj::Maybe<Executor> executor; kj::Maybe<Executor> executor;
// Allocated the first time getExecutor() is requested, making cross-thread request possible. // Allocated the first time getExecutor() is requested, making cross-thread request possible.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment