Commit ece2a1aa authored by Kenton Varda's avatar Kenton Varda

Add WaitScope::poll() and Promise::poll() to pump all events.

These are useful in unit tests. Often, some tasks have been queued, and we want to test that a particular thing happens or doesn't happen as a result of those tasks, without actually waiting on any specific promise. We need a way to run the event queue until it is empty. For that, we add WaitScope::poll().

Or, sometimes, we want to check if a specific promise is (or is not!) resolved as a result of the tasks that have been queued so far, but we may not want to actulaly wait() on it for a couple reasons:
- We may actually want to verify that the promise is *not* resolved by tasks so far. wait()ing on it would be expected to deadlock.
- We may expect the promise to resolve, but would prefer not to deadlock if it doesn't resolve (we'd rather fail fast).

For this, we add Promise::poll().

There are lots of tests I've written which could be simplified by this, but for now I'm not refactoring any existing tests.

Note that adding these (particularly Promise::poll()) required updating the async framework a bit, in that this is the first case where PromiseNode::onReady() might be called multiple times, or might be canceled without destroying the PromiseNode entirely. Luckily this was not hard to account for. (We still have the rule that get() can only be called once, though.)
parent 73a01874
......@@ -136,8 +136,12 @@ class PromiseNode {
// internal implementation details.
public:
virtual void onReady(Event& event) noexcept = 0;
virtual void onReady(Event* event) noexcept = 0;
// Arms the given event when ready.
//
// May be called multiple times. If called again before the event was armed, the old event will
// never be armed, only the new one. If called again after the event was armed, the new event
// will be armed immediately. Can be called with nullptr to un-register the existing event.
virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept;
// Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
......@@ -159,8 +163,7 @@ protected:
// Helper class for implementing onReady().
public:
void init(Event& newEvent);
// Returns true if arm() was already called.
void init(Event* newEvent);
void arm();
// Arms the event if init() has already been called and makes future calls to init() return
......@@ -178,7 +181,7 @@ public:
ImmediatePromiseNodeBase();
~ImmediatePromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
};
template <typename T>
......@@ -212,7 +215,7 @@ class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -338,7 +341,7 @@ class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -410,7 +413,7 @@ public:
// Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
PromiseNode* getInnerForTrace() override;
protected:
......@@ -545,7 +548,7 @@ public:
explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -585,7 +588,7 @@ public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -619,7 +622,7 @@ public:
ExceptionOrValue* resultParts, size_t partSize);
~ArrayJoinPromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override final;
void onReady(Event* event) noexcept override final;
void get(ExceptionOrValue& output) noexcept override final;
PromiseNode* getInnerForTrace() override final;
......@@ -698,7 +701,7 @@ class EagerPromiseNodeBase: public PromiseNode, protected Event {
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
......@@ -735,7 +738,7 @@ Own<PromiseNode> spark(Own<PromiseNode>&& node) {
class AdapterPromiseNodeBase: public PromiseNode {
public:
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
protected:
inline void setReady() {
......@@ -854,7 +857,7 @@ template <typename T>
T Promise<T>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::FixVoid<T>> result;
waitImpl(kj::mv(node), result, waitScope);
_::waitImpl(kj::mv(node), result, waitScope);
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
......@@ -875,7 +878,7 @@ inline void Promise<void>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::Void> result;
waitImpl(kj::mv(node), result, waitScope);
_::waitImpl(kj::mv(node), result, waitScope);
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
......@@ -889,6 +892,11 @@ inline void Promise<void>::wait(WaitScope& waitScope) {
}
}
template <typename T>
bool Promise<T>::poll(WaitScope& waitScope) {
return _::pollImpl(*node, waitScope);
}
template <typename T>
ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
......
......@@ -199,6 +199,7 @@ private:
void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope);
Promise<void> yield();
Own<PromiseNode> neverDone();
......
......@@ -748,5 +748,16 @@ TEST(Async, SetRunnable) {
}
}
TEST(Async, Poll) {
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<void>();
KJ_ASSERT(!paf.promise.poll(waitScope));
paf.fulfiller->fulfill();
KJ_ASSERT(paf.promise.poll(waitScope));
paf.promise.wait(waitScope);
}
} // namespace
} // namespace kj
......@@ -66,8 +66,8 @@ public:
class YieldPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event& event) noexcept override {
event.armBreadthFirst();
void onReady(_::Event* event) noexcept override {
if (event) event->armBreadthFirst();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
......@@ -76,7 +76,7 @@ public:
class NeverDonePromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event& event) noexcept override {
void onReady(_::Event* event) noexcept override {
// ignore
}
void get(_::ExceptionOrValue& output) noexcept override {
......@@ -108,7 +108,7 @@ public:
Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
: taskSet(taskSet), node(kj::mv(nodeParam)) {
node->setSelfPointer(&node);
node->onReady(*this);
node->onReady(this);
}
protected:
......@@ -312,6 +312,26 @@ void EventLoop::leaveScope() {
threadLocalEventLoop = nullptr;
}
void WaitScope::poll() {
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
loop.running = true;
KJ_DEFER(loop.running = false);
for (;;) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
if (!loop.isRunnable()) {
// Still no events in the queue. We're done.
return;
}
}
}
}
namespace _ { // private
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) {
......@@ -321,7 +341,7 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
BoolEvent doneEvent;
node->setSelfPointer(&node);
node->onReady(doneEvent);
node->onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
......@@ -343,6 +363,35 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
}
}
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
EventLoop& loop = waitScope.loop;
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
BoolEvent doneEvent;
node.onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
while (!doneEvent.fired) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up.
node.onReady(nullptr);
loop.setRunnable(false);
return false;
}
}
}
loop.setRunnable(loop.isRunnable());
return true;
}
Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
......@@ -498,26 +547,28 @@ void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
void PromiseNode::OnReadyEvent::init(Event& newEvent) {
void PromiseNode::OnReadyEvent::init(Event* newEvent) {
if (event == _kJ_ALREADY_READY) {
// A new continuation was added to a promise that was already ready. In this case, we schedule
// breadth-first, to make it difficult for applications to accidentally starve the event loop
// by repeatedly waiting on immediate promises.
newEvent.armBreadthFirst();
if (newEvent) newEvent->armBreadthFirst();
} else {
event = &newEvent;
event = newEvent;
}
}
void PromiseNode::OnReadyEvent::arm() {
if (event == nullptr) {
event = _kJ_ALREADY_READY;
} else {
KJ_ASSERT(event != _kJ_ALREADY_READY, "arm() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it. In this case, arm in depth-first
// order so that the event runs immediately after the current one. This way, chained promises
// execute together for better cache locality and lower latency.
event->armDepthFirst();
}
event = _kJ_ALREADY_READY;
}
// -------------------------------------------------------------------
......@@ -525,8 +576,8 @@ void PromiseNode::OnReadyEvent::arm() {
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
event.armBreadthFirst();
void ImmediatePromiseNodeBase::onReady(Event* event) noexcept {
if (event) event->armBreadthFirst();
}
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
......@@ -543,7 +594,7 @@ AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependen
dependency->setSelfPointer(&dependency);
}
void AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
void AttachmentPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
......@@ -567,7 +618,7 @@ TransformPromiseNodeBase::TransformPromiseNodeBase(
dependency->setSelfPointer(&dependency);
}
void TransformPromiseNodeBase::onReady(Event& event) noexcept {
void TransformPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
......@@ -635,7 +686,7 @@ void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
}
}
void ForkBranchBase::onReady(Event& event) noexcept {
void ForkBranchBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -648,7 +699,7 @@ PromiseNode* ForkBranchBase::getInnerForTrace() {
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
: inner(kj::mv(innerParam)), resultRef(resultRef) {
inner->setSelfPointer(&inner);
inner->onReady(*this);
inner->onReady(this);
}
Maybe<Own<Event>> ForkHubBase::fire() {
......@@ -682,16 +733,16 @@ _::PromiseNode* ForkHubBase::getInnerForTrace() {
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
: state(STEP1), inner(kj::mv(innerParam)) {
inner->setSelfPointer(&inner);
inner->onReady(*this);
inner->onReady(this);
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
void ChainPromiseNode::onReady(Event& event) noexcept {
void ChainPromiseNode::onReady(Event* event) noexcept {
switch (state) {
case STEP1:
KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
onReadyEvent = &event;
onReadyEvent = event;
return;
case STEP2:
inner->onReady(event);
......@@ -755,7 +806,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
*selfPtr = kj::mv(inner);
selfPtr->get()->setSelfPointer(selfPtr);
if (onReadyEvent != nullptr) {
selfPtr->get()->onReady(*onReadyEvent);
selfPtr->get()->onReady(onReadyEvent);
}
// Return our self-pointer so that the caller takes care of deleting it.
......@@ -763,7 +814,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
} else {
inner->setSelfPointer(&inner);
if (onReadyEvent != nullptr) {
inner->onReady(*onReadyEvent);
inner->onReady(onReadyEvent);
}
return nullptr;
......@@ -777,7 +828,7 @@ ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<Pr
ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
void ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
void ExclusiveJoinPromiseNode::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -797,7 +848,7 @@ ExclusiveJoinPromiseNode::Branch::Branch(
ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
......@@ -847,7 +898,7 @@ ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
void ArrayJoinPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -873,7 +924,7 @@ ArrayJoinPromiseNodeBase::Branch::Branch(
ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
......@@ -921,10 +972,10 @@ EagerPromiseNodeBase::EagerPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
: dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
void EagerPromiseNodeBase::onReady(Event& event) noexcept {
void EagerPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -946,7 +997,7 @@ Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
// -------------------------------------------------------------------
void AdapterPromiseNodeBase::onReady(Event& event) noexcept {
void AdapterPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......
......@@ -235,6 +235,19 @@ public:
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
bool poll(WaitScope& waitScope);
// Returns true if a call to wait() would complete without blocking, false if it would block.
//
// If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an
// attempt to resolve it. Only when there is nothing left to do will it return false.
//
// Generally, poll() is most useful in tests. Often, you may want to verify that a promise does
// not resolve until some specific event occurs. To do so, poll() the promise before the event to
// verify it isn't resolved, then trigger the event, the poll() again to verify that it resolves.
// The first poll() verifies that the promise doesn't resolve early, which would otherwise be
// hard to do deterministically. The second poll() allows you to check that the promise has
// resolved and avoid a wait() that might deadlock in the case that it hasn't.
ForkedPromise<T> fork() KJ_WARN_UNUSED_RESULT;
// Forks the promise, so that multiple different clients can independently wait on the result.
// `T` must be copy-constructable for this to work. Or, in the special case where `T` is
......@@ -650,6 +663,7 @@ private:
friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
friend class _::Event;
friend class WaitScope;
};
......@@ -668,11 +682,15 @@ public:
inline ~WaitScope() { loop.leaveScope(); }
KJ_DISALLOW_COPY(WaitScope);
void poll();
// Pumps the event queue and polls for I/O until there's nothing left to do (without blocking).
private:
EventLoop& loop;
friend class EventLoop;
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
};
} // namespace kj
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment