Commit 793b2af2 authored by Kenton Varda's avatar Kenton Varda

Tweak scheduling.

parent 222c3b20
......@@ -133,8 +133,8 @@ class PromiseNode {
// internal implementation details.
public:
virtual bool onReady(Event& event) noexcept = 0;
// Returns true if already ready, otherwise arms the given event when ready.
virtual void onReady(Event& event) noexcept = 0;
// Arms the given event when ready.
virtual void get(ExceptionOrValue& output) noexcept = 0;
// Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
......@@ -150,7 +150,7 @@ protected:
// Helper class for implementing onReady().
public:
bool init(Event& newEvent);
void init(Event& newEvent);
// Returns true if arm() was already called.
void arm();
......@@ -166,7 +166,7 @@ protected:
class ImmediatePromiseNodeBase: public PromiseNode {
public:
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
};
template <typename T>
......@@ -200,7 +200,7 @@ class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -239,7 +239,7 @@ class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -309,7 +309,7 @@ public:
// Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override;
protected:
......@@ -401,7 +401,7 @@ public:
explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -439,7 +439,7 @@ public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -474,7 +474,7 @@ class EagerPromiseNodeBase: public PromiseNode, protected Event {
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
......@@ -511,7 +511,7 @@ Own<PromiseNode> spark(Own<PromiseNode>&& node) {
class AdapterPromiseNodeBase: public PromiseNode {
public:
bool onReady(Event& event) noexcept override;
void onReady(Event& event) noexcept override;
protected:
inline void setReady() {
......@@ -588,10 +588,9 @@ T EventLoop::wait(Promise<T>&& promise) {
template <typename Func>
PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) {
// Invoke thenImpl() on yield(). Always spark the result.
// Invoke thenImpl() on yield().
return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
thenImpl(yield(), kj::fwd<Func>(func), _::PropagateException())));
thenImpl(yield(), kj::fwd<Func>(func), _::PropagateException()));
}
template <typename T, typename Func, typename ErrorFunc>
......
......@@ -240,24 +240,43 @@ TEST(Async, Ordering) {
promises[1] = evalLater([&]() {
EXPECT_EQ(0, counter++);
promises[2] = Promise<void>(READY_NOW).then([&]() {
{
// Use a promise and fulfiller so that we can fulfill the promise after waiting on it in
// order to induce depth-first scheduling.
auto paf = kj::newPromiseAndFulfiller<void>();
promises[2] = paf.promise.then([&]() {
EXPECT_EQ(1, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
});
promises[3] = evalLater([&]() {
promises[2].eagerlyEvaluate();
paf.fulfiller->fulfill();
}
// .then() is scheduled breadth-first if the promise has already resolved, but depth-first
// if the promise resolves later.
promises[3] = Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(4, counter++);
return Promise<void>(READY_NOW).then([&]() {
}).then([&]() {
EXPECT_EQ(5, counter++);
});
});
promises[4] = Promise<void>(READY_NOW).then([&]() {
promises[3].eagerlyEvaluate();
{
auto paf = kj::newPromiseAndFulfiller<void>();
promises[4] = paf.promise.then([&]() {
EXPECT_EQ(2, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
});
promises[4].eagerlyEvaluate();
paf.fulfiller->fulfill();
}
// evalLater() is like READY_NOW.then().
promises[5] = evalLater([&]() {
EXPECT_EQ(6, counter++);
});
promises[5].eagerlyEvaluate();
});
promises[1].eagerlyEvaluate();
promises[0] = evalLater([&]() {
EXPECT_EQ(3, counter++);
......@@ -266,6 +285,7 @@ TEST(Async, Ordering) {
// point.)
return Promise<void>(READY_NOW);
});
promises[0].eagerlyEvaluate();
for (auto i: indices(promises)) {
kj::mv(promises[i]).wait();
......@@ -367,8 +387,10 @@ TEST(Async, ExclusiveJoin) {
{
SimpleEventLoop loop;
auto right = evalLater([&]() { return 456; });
auto left = evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 456; });
right.eagerlyEvaluate();
left.exclusiveJoin(kj::mv(right));
......
......@@ -58,19 +58,18 @@ public:
class YieldPromiseNode final: public _::PromiseNode {
public:
bool onReady(_::Event& event) noexcept override {
void onReady(_::Event& event) noexcept override {
event.armBreadthFirst();
return false;
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>().value = _::Void();
output.as<_::Void>() = _::Void();
}
};
class NeverReadyPromiseNode final: public _::PromiseNode {
public:
bool onReady(_::Event& event) noexcept override {
return false;
void onReady(_::Event& event) noexcept override {
// ignore
}
void get(_::ExceptionOrValue& output) noexcept override {
KJ_FAIL_REQUIRE("Not ready.");
......@@ -100,9 +99,7 @@ public:
public:
Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
: taskSet(taskSet), node(kj::mv(nodeParam)) {
if (node->onReady(*this)) {
armDepthFirst();
}
node->onReady(*this);
}
protected:
......@@ -232,7 +229,7 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result
KJ_REQUIRE(!running, "wait() is not allowed from within event callbacks.");
BoolEvent doneEvent;
doneEvent.fired = node->onReady(doneEvent);
node->onReady(doneEvent);
running = true;
KJ_DEFER(running = false);
......@@ -251,6 +248,10 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result
_::Event* event = head;
head = event->next;
if (head != nullptr) {
head->prev = &head;
}
depthFirstInsertPoint = &head;
if (tail == &event->next) {
tail = &head;
......@@ -294,9 +295,6 @@ Event::Event()
Event::~Event() noexcept(false) {
if (prev != nullptr) {
if (loop.head == this) {
loop.head = next;
}
if (loop.tail == &next) {
loop.tail = prev;
}
......@@ -489,12 +487,14 @@ kj::String PromiseBase::trace() {
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
bool PromiseNode::OnReadyEvent::init(Event& newEvent) {
void PromiseNode::OnReadyEvent::init(Event& newEvent) {
if (event == _kJ_ALREADY_READY) {
return true;
// A new continuation was added to a promise that was already ready. In this case, we schedule
// breadth-first, to make it difficult for applications to accidentally starve the event loop
// by repeatedly waiting on immediate promises.
newEvent.armBreadthFirst();
} else {
event = &newEvent;
return false;
}
}
......@@ -502,13 +502,18 @@ void PromiseNode::OnReadyEvent::arm() {
if (event == nullptr) {
event = _kJ_ALREADY_READY;
} else {
// A promise resolved and an event is already waiting on it. In this case, arm in depth-first
// order so that the event runs immediately after the current one. This way, chained promises
// execute together for better cache locality and lower latency.
event->armDepthFirst();
}
}
// -------------------------------------------------------------------
bool ImmediatePromiseNodeBase::onReady(Event& event) noexcept { return true; }
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
event.armBreadthFirst();
}
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
: exception(kj::mv(exception)) {}
......@@ -522,8 +527,8 @@ void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency)
: dependency(kj::mv(dependency)) {}
bool AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
return dependency->onReady(event);
void AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
dependency->onReady(event);
}
void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
......@@ -543,8 +548,8 @@ void AttachmentPromiseNodeBase::dropDependency() {
TransformPromiseNodeBase::TransformPromiseNodeBase(Own<PromiseNode>&& dependency)
: dependency(kj::mv(dependency)) {}
bool TransformPromiseNodeBase::onReady(Event& event) noexcept {
return dependency->onReady(event);
void TransformPromiseNodeBase::onReady(Event& event) noexcept {
dependency->onReady(event);
}
void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
......@@ -607,8 +612,8 @@ void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
}
}
bool ForkBranchBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event);
void ForkBranchBase::onReady(Event& event) noexcept {
onReadyEvent.init(event);
}
PromiseNode* ForkBranchBase::getInnerForTrace() {
......@@ -619,7 +624,7 @@ PromiseNode* ForkBranchBase::getInnerForTrace() {
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
: inner(kj::mv(innerParam)), resultRef(resultRef) {
if (inner->onReady(*this)) armDepthFirst();
inner->onReady(*this);
}
Maybe<Own<Event>> ForkHubBase::fire() {
......@@ -652,19 +657,20 @@ _::PromiseNode* ForkHubBase::getInnerForTrace() {
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
: state(STEP1), inner(kj::mv(innerParam)) {
if (inner->onReady(*this)) armDepthFirst();
inner->onReady(*this);
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
bool ChainPromiseNode::onReady(Event& event) noexcept {
void ChainPromiseNode::onReady(Event& event) noexcept {
switch (state) {
case STEP1:
KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
onReadyEvent = &event;
return false;
return;
case STEP2:
return inner->onReady(event);
inner->onReady(event);
return;
}
KJ_UNREACHABLE;
}
......@@ -710,9 +716,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
state = STEP2;
if (onReadyEvent != nullptr) {
if (inner->onReady(*onReadyEvent)) {
onReadyEvent->armDepthFirst();
}
inner->onReady(*onReadyEvent);
}
return nullptr;
......@@ -725,8 +729,8 @@ ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<Pr
ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
bool ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
return onReadyEvent.init(event);
void ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
onReadyEvent.init(event);
}
void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept {
......@@ -744,7 +748,7 @@ PromiseNode* ExclusiveJoinPromiseNode::getInnerForTrace() {
ExclusiveJoinPromiseNode::Branch::Branch(
ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
if (dependency->onReady(*this)) armDepthFirst();
dependency->onReady(*this);
}
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
......@@ -779,11 +783,11 @@ PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() {
EagerPromiseNodeBase::EagerPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
: dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
if (dependency->onReady(*this)) armDepthFirst();
dependency->onReady(*this);
}
bool EagerPromiseNodeBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event);
void EagerPromiseNodeBase::onReady(Event& event) noexcept {
onReadyEvent.init(event);
}
PromiseNode* EagerPromiseNodeBase::getInnerForTrace() {
......@@ -804,8 +808,8 @@ Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
// -------------------------------------------------------------------
bool AdapterPromiseNodeBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event);
void AdapterPromiseNodeBase::onReady(Event& event) noexcept {
onReadyEvent.init(event);
}
} // namespace _ (private)
......
......@@ -309,10 +309,14 @@ PromiseForResult<Func, void> evalLater(Func&& func);
// Example usage:
// Promise<int> x = evalLater([]() { return 123; });
//
// The above is exactly equivalent to:
// Promise<int> x = Promise<void>(READY_NOW).then([]() { return 123; });
//
// If the returned promise is destroyed before the callback runs, the callback will be canceled
// (never called).
//
// If you schedule several evaluations with `evalLater`, they will be executed in order.
// If you schedule several evaluations with `evalLater` during the same callback, they are
// guaranteed to be executed in order.
// =======================================================================================
// Hack for creating a lambda that holds an owned pointer.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment