Commit 2dbf31da authored by Kenton Varda's avatar Kenton Varda

Re-arrange async headers a bit to improve organization.

parent 4158ca9f
...@@ -136,7 +136,9 @@ includekj_HEADERS = \ ...@@ -136,7 +136,9 @@ includekj_HEADERS = \
src/kj/mutex.h \ src/kj/mutex.h \
src/kj/thread.h \ src/kj/thread.h \
src/kj/work-queue.h \ src/kj/work-queue.h \
src/kj/async-prelude.h \
src/kj/async.h \ src/kj/async.h \
src/kj/async-impl.h \
src/kj/async-unix.h \ src/kj/async-unix.h \
src/kj/async-io.h \ src/kj/async-io.h \
src/kj/main.h src/kj/main.h
......
...@@ -207,7 +207,7 @@ public: ...@@ -207,7 +207,7 @@ public:
daemonPromise.attach(kj::addRef(*context)); daemonPromise.attach(kj::addRef(*context));
daemonPromise.exclusiveJoin(kj::mv(cancelPaf.promise)); daemonPromise.exclusiveJoin(kj::mv(cancelPaf.promise));
// Daemonize, ignoring exceptions. // Daemonize, ignoring exceptions.
kj::daemonize(kj::mv(daemonPromise), [](kj::Exception&&) {}); daemonPromise.daemonize([](kj::Exception&&) {});
// Now the other branch returns the response from the context. // Now the other branch returns the response from the context.
auto contextPtr = context.get(); auto contextPtr = context.get();
......
...@@ -807,7 +807,7 @@ void doTest() { ...@@ -807,7 +807,7 @@ void doTest() {
uint nextOrdinal = 0; uint nextOrdinal = 0;
for (uint i = 0; i < 128; i++) { for (uint i = 0; i < 96; i++) {
uint oldOrdinalCount = nextOrdinal; uint oldOrdinalCount = nextOrdinal;
auto newBuilder = kj::heap<MallocMessageBuilder>(); auto newBuilder = kj::heap<MallocMessageBuilder>();
......
...@@ -2360,7 +2360,7 @@ private: ...@@ -2360,7 +2360,7 @@ private:
auto promise = kj::mv(cancelPaf.promise); auto promise = kj::mv(cancelPaf.promise);
promise.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){})); promise.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}));
daemonize(kj::mv(promise), [](kj::Exception&&) {}); promise.daemonize([](kj::Exception&&) {});
} else { } else {
// Hack: Both the success and error continuations need to use the context. We could // Hack: Both the success and error continuations need to use the context. We could
// refcount, but both will be destroyed at the same time anyway. // refcount, but both will be destroyed at the same time anyway.
...@@ -2377,7 +2377,7 @@ private: ...@@ -2377,7 +2377,7 @@ private:
}); });
promise.attach(kj::mv(context)); promise.attach(kj::mv(context));
promise.exclusiveJoin(kj::mv(cancelPaf.promise)); promise.exclusiveJoin(kj::mv(cancelPaf.promise));
daemonize(kj::mv(promise), [](kj::Exception&&) {}); promise.daemonize([](kj::Exception&&) {});
} }
} }
} }
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// This file contains extended inline implementation details that are required along with async.h.
// We move this all into a separate file to make async.h more readable.
//
// Non-inline declarations here are defined in async.c++.
#ifndef KJ_ASYNC_H_
#error "Do not include this directly; include kj/async.h."
#endif
#ifndef KJ_ASYNC_INL_H_
#define KJ_ASYNC_INL_H_
namespace kj {
namespace _ { // private
template <typename T>
class ExceptionOr;
class ExceptionOrValue {
public:
ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
KJ_DISALLOW_COPY(ExceptionOrValue);
void addException(Exception&& exception) {
if (this->exception == nullptr) {
this->exception = kj::mv(exception);
}
}
template <typename T>
ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
template <typename T>
const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
Maybe<Exception> exception;
protected:
// Allow subclasses to have move constructor / assignment.
ExceptionOrValue() = default;
ExceptionOrValue(ExceptionOrValue&& other) = default;
ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
};
template <typename T>
class ExceptionOr: public ExceptionOrValue {
public:
ExceptionOr() = default;
ExceptionOr(T&& value): value(kj::mv(value)) {}
ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
ExceptionOr(ExceptionOr&&) = default;
ExceptionOr& operator=(ExceptionOr&&) = default;
Maybe<T> value;
};
class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this
// internally.
public:
Event();
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void armDepthFirst();
// Enqueue this event so that `fire()` will be called from the event loop soon.
//
// Events scheduled in this way are executed in depth-first order: if an event callback arms
// more events, those events are placed at the front of the queue (in the order in which they
// were armed), so that they run immediately after the first event's callback returns.
//
// Depth-first event scheduling is appropriate for events that represent simple continuations
// of a previous event that should be globbed together for performance. Depth-first scheduling
// can lead to starvation, so any long-running task must occasionally yield with
// `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
// breadth-first.)
//
// To use breadth-first scheduling instead, use `armLater()`.
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
kj::String trace();
// Dump debug info about this event.
virtual _::PromiseNode* getInnerForTrace();
// If this event wraps a PromiseNode, get that node. Used for debug tracing.
// Default implementation returns nullptr.
protected:
virtual Maybe<Own<Event>> fire() = 0;
// Fire the event. Possibly returns a pointer to itself, which will be discarded by the
// caller. This is the only way that an event can delete itself as a result of firing, as
// doing so from within fire() will throw an exception.
private:
friend class kj::EventLoop;
EventLoop& loop;
Event* next;
Event** prev;
bool firing = false;
};
class PromiseNode {
// A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
//
// To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky
// use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
// so down-cast in the few places that really need to be templated. Luckily this is all
// internal implementation details.
public:
virtual bool onReady(Event& event) noexcept = 0;
// Returns true if already ready, otherwise arms the given event when ready.
virtual void get(ExceptionOrValue& output) noexcept = 0;
// Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
// Can only be called once, and only after the node is ready. Must be called directly from the
// event loop, with no application code on the stack.
virtual PromiseNode* getInnerForTrace();
// If this node wraps some other PromiseNode, get the wrapped node. Used for debug tracing.
// Default implementation returns nullptr.
protected:
class OnReadyEvent {
// Helper class for implementing onReady().
public:
bool init(Event& newEvent);
// Returns true if arm() was already called.
void arm();
// Arms the event if init() has already been called and makes future calls to init() return
// true.
private:
Event* event = nullptr;
};
};
// -------------------------------------------------------------------
class ImmediatePromiseNodeBase: public PromiseNode {
public:
bool onReady(Event& event) noexcept override;
};
template <typename T>
class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
// A promise that has already been resolved to an immediate value or exception.
public:
ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
public:
ImmediateBrokenPromiseNode(Exception&& exception);
void get(ExceptionOrValue& output) noexcept override;
private:
Exception exception;
};
// -------------------------------------------------------------------
class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
void dropDependency();
template <typename>
friend class AttachmentPromiseNode;
};
template <typename Attachment>
class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
// A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
// object) until the promise resolves.
public:
AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment)
: AttachmentPromiseNodeBase(kj::mv(dependency)),
attachment(kj::mv<Attachment>(attachment)) {}
~AttachmentPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the attachment because the
// dependency may be using the attachment.
dropDependency();
}
private:
Attachment attachment;
};
// -------------------------------------------------------------------
class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
void dropDependency();
void getDepResult(ExceptionOrValue& output);
virtual void getImpl(ExceptionOrValue& output) = 0;
template <typename, typename, typename, typename>
friend class TransformPromiseNode;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
class TransformPromiseNode final: public TransformPromiseNodeBase {
// A PromiseNode that transforms the result of another PromiseNode through an application-provided
// function (implements `then()`).
public:
TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler)
: TransformPromiseNodeBase(kj::mv(dependency)),
func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
~TransformPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the continuations because it
// is a common pattern for the continuations to hold ownership of objects that might be in-use
// by the dependency.
dropDependency();
}
private:
Func func;
ErrorFunc errorHandler;
void getImpl(ExceptionOrValue& output) override {
ExceptionOr<DepT> depResult;
getDepResult(depResult);
KJ_IF_MAYBE(depException, depResult.exception) {
output.as<T>() = handle(
MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
errorHandler, kj::mv(*depException)));
} else KJ_IF_MAYBE(depValue, depResult.value) {
output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
}
}
ExceptionOr<T> handle(T&& value) {
return kj::mv(value);
}
ExceptionOr<T> handle(PropagateException::Bottom&& value) {
return ExceptionOr<T>(false, value.asException());
}
};
// -------------------------------------------------------------------
class ForkHubBase;
class ForkBranchBase: public PromiseNode {
public:
ForkBranchBase(Own<ForkHubBase>&& hub);
~ForkBranchBase() noexcept(false);
void hubReady() noexcept;
// Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------
bool onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override;
protected:
inline ExceptionOrValue& getHubResultRef();
void releaseHub(ExceptionOrValue& output);
// Release the hub. If an exception is thrown, add it to `output`.
private:
OnReadyEvent onReadyEvent;
Own<ForkHubBase> hub;
ForkBranchBase* next = nullptr;
ForkBranchBase** prevPtr = nullptr;
friend class ForkHubBase;
};
template <typename T> T copyOrAddRef(T& t) { return t; }
template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
template <typename T>
class ForkBranch final: public ForkBranchBase {
// A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
// a const reference.
public:
ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
void get(ExceptionOrValue& output) noexcept override {
ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
KJ_IF_MAYBE(value, hubResult.value) {
output.as<T>().value = copyOrAddRef(*value);
} else {
output.as<T>().value = nullptr;
}
output.exception = hubResult.exception;
releaseHub(output);
}
};
// -------------------------------------------------------------------
class ForkHubBase: public Refcounted, protected Event {
public:
ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef);
inline ExceptionOrValue& getResultRef() { return resultRef; }
private:
Own<PromiseNode> inner;
ExceptionOrValue& resultRef;
ForkBranchBase* headBranch = nullptr;
ForkBranchBase** tailBranch = &headBranch;
// Tail becomes null once the inner promise is ready and all branches have been notified.
Maybe<Own<Event>> fire() override;
_::PromiseNode* getInnerForTrace() override;
friend class ForkBranchBase;
};
template <typename T>
class ForkHub final: public ForkHubBase {
// A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces
// the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
// possible).
public:
ForkHub(Own<PromiseNode>&& inner): ForkHubBase(kj::mv(inner), result) {}
Promise<_::UnfixVoid<T>> addBranch() {
return Promise<_::UnfixVoid<T>>(false, kj::heap<ForkBranch<T>>(addRef(*this)));
}
private:
ExceptionOr<T> result;
};
inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
return hub->getResultRef();
}
// -------------------------------------------------------------------
class ChainPromiseNode final: public PromiseNode, private Event {
public:
explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
enum State {
STEP1,
STEP2
};
State state;
Own<PromiseNode> inner;
// In PRE_STEP1 / STEP1, a PromiseNode for a Promise<T>.
// In STEP2, a PromiseNode for a T.
Event* onReadyEvent = nullptr;
Maybe<Own<Event>> fire() override;
};
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) {
return heap<ChainPromiseNode>(kj::mv(node));
}
template <typename T>
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) {
return kj::mv(node);
}
// -------------------------------------------------------------------
class ExclusiveJoinPromiseNode final: public PromiseNode {
public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
class Branch: public Event {
public:
Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency);
~Branch() noexcept(false);
bool get(ExceptionOrValue& output);
// Returns true if this is the side that finished.
Maybe<Own<Event>> fire() override;
_::PromiseNode* getInnerForTrace() override;
private:
ExclusiveJoinPromiseNode& joinNode;
Own<PromiseNode> dependency;
};
Branch left;
Branch right;
OnReadyEvent onReadyEvent;
};
// -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected Event {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
// evaluate it.
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
bool onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
OnReadyEvent onReadyEvent;
ExceptionOrValue& resultRef;
Maybe<Own<Event>> fire() override;
};
template <typename T>
class EagerPromiseNode final: public EagerPromiseNodeBase {
public:
EagerPromiseNode(Own<PromiseNode>&& dependency)
: EagerPromiseNodeBase(kj::mv(dependency), result) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
template <typename T>
Own<PromiseNode> spark(Own<PromiseNode>&& node) {
// Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
// on it.
return heap<EagerPromiseNode<T>>(kj::mv(node));
}
// -------------------------------------------------------------------
class AdapterPromiseNodeBase: public PromiseNode {
public:
bool onReady(Event& event) noexcept override;
protected:
inline void setReady() {
onReadyEvent.arm();
}
private:
OnReadyEvent onReadyEvent;
};
template <typename T, typename Adapter>
class AdapterPromiseNode final: public AdapterPromiseNodeBase,
private PromiseFulfiller<UnfixVoid<T>> {
// A PromiseNode that wraps a PromiseAdapter.
public:
template <typename... Params>
AdapterPromiseNode(Params&&... params)
: adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
void get(ExceptionOrValue& output) noexcept override {
KJ_IREQUIRE(!isWaiting());
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
bool waiting = true;
Adapter adapter;
void fulfill(T&& value) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(kj::mv(value));
setReady();
}
}
void reject(Exception&& exception) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(false, kj::mv(exception));
setReady();
}
}
bool isWaiting() override {
return waiting;
}
};
} // namespace _ (private)
// =======================================================================================
template <typename T>
T EventLoop::wait(Promise<T>&& promise) {
_::ExceptionOr<_::FixVoid<T>> result;
waitImpl(kj::mv(promise.node), result);
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
template <typename Func>
PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) {
// Invoke thenImpl() on yield(). Always spark the result.
return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
thenImpl(yield(), kj::fwd<Func>(func), _::PropagateException())));
}
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> EventLoop::thenImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler) {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
kj::mv(promise.node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr));
}
template <typename T>
Promise<T>::Promise(_::FixVoid<T> value)
: PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
template <typename T>
Promise<T>::Promise(kj::Exception&& exception)
: PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
template <typename T>
template <typename Func, typename ErrorFunc>
PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) {
return PromiseForResult<Func, T>(false, EventLoop::current().thenImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)));
}
template <typename T>
T Promise<T>::wait() {
return EventLoop::current().wait(kj::mv(*this));
}
template <typename T>
ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
}
template <typename T>
Promise<T> ForkedPromise<T>::addBranch() {
return hub->addBranch();
}
template <typename T>
void Promise<T>::exclusiveJoin(Promise<T>&& other) {
node = heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node));
}
template <typename T>
template <typename... Attachments>
void Promise<T>::attach(Attachments&&... attachments) {
node = kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...));
}
template <typename T>
void Promise<T>::eagerlyEvaluate() {
node = _::spark<_::FixVoid<T>>(kj::mv(node));
}
template <typename T>
kj::String Promise<T>::trace() {
return PromiseBase::trace();
}
template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) {
return EventLoop::current().evalLater(kj::fwd<Func>(func));
}
template <typename T>
template <typename ErrorFunc>
void Promise<T>::daemonize(ErrorFunc&& errorHandler) {
return EventLoop::current().daemonize(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
}
template <>
template <typename ErrorFunc>
void Promise<void>::daemonize(ErrorFunc&& errorHandler) {
return EventLoop::current().daemonize(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
}
// =======================================================================================
namespace _ { // private
template <typename T>
class WeakFulfiller final: public PromiseFulfiller<T>, private kj::Disposer {
// A wrapper around PromiseFulfiller which can be detached.
//
// There are a couple non-trivialities here:
// - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
// rejected.
// - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
// detached from the underlying fulfiller, because otherwise the later detach() call will go
// to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
// refcount never goes over 2 and we manually implement the refcounting because we already need
// a mutex anyway. To this end, WeakFulfiller is its own Disposer -- dispose() is called when
// the application discards its owned pointer to the fulfiller and detach() is called when the
// promise is destroyed.
public:
KJ_DISALLOW_COPY(WeakFulfiller);
static kj::Own<WeakFulfiller> make() {
WeakFulfiller* ptr = new WeakFulfiller;
return Own<WeakFulfiller>(ptr, *ptr);
}
void fulfill(FixVoid<T>&& value) override {
if (inner != nullptr) {
inner->fulfill(kj::mv(value));
}
}
void reject(Exception&& exception) override {
if (inner != nullptr) {
inner->reject(kj::mv(exception));
}
}
bool isWaiting() override {
return inner != nullptr && inner->isWaiting();
}
void attach(PromiseFulfiller<T>& newInner) {
inner = &newInner;
}
void detach(PromiseFulfiller<T>& from) {
if (inner == nullptr) {
// Already disposed.
delete this;
} else {
KJ_IREQUIRE(inner == &from);
inner = nullptr;
}
}
private:
mutable PromiseFulfiller<T>* inner;
WeakFulfiller(): inner(nullptr) {}
void disposeImpl(void* pointer) const override {
// TODO(perf): Factor some of this out so it isn't regenerated for every fulfiller type?
if (inner == nullptr) {
// Already detached.
delete this;
} else {
if (inner->isWaiting()) {
inner->reject(kj::Exception(
kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__,
kj::heapString("PromiseFulfiller was destroyed without fulfilling the promise.")));
}
inner = nullptr;
}
}
};
template <typename T>
class PromiseAndFulfillerAdapter {
public:
PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
WeakFulfiller<T>& wrapper)
: fulfiller(fulfiller), wrapper(wrapper) {
wrapper.attach(fulfiller);
}
~PromiseAndFulfillerAdapter() noexcept(false) {
wrapper.detach(fulfiller);
}
private:
PromiseFulfiller<T>& fulfiller;
WeakFulfiller<T>& wrapper;
};
} // namespace _ (private)
template <typename T, typename Adapter, typename... Params>
Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
return Promise<T>(false, heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
kj::fwd<Params>(adapterConstructorParams)...));
}
template <typename T>
PromiseFulfillerPair<T> newPromiseAndFulfiller() {
auto wrapper = _::WeakFulfiller<T>::make();
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
Promise<_::JoinPromises<T>> promise(false,
_::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr)));
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
}
} // namespace kj
#endif // KJ_ASYNC_INL_H_
...@@ -41,14 +41,14 @@ TEST(AsyncIo, SimpleNetwork) { ...@@ -41,14 +41,14 @@ TEST(AsyncIo, SimpleNetwork) {
auto port = newPromiseAndFulfiller<uint>(); auto port = newPromiseAndFulfiller<uint>();
daemonize(port.promise.then([&](uint portnum) { port.promise.then([&](uint portnum) {
return network->parseRemoteAddress("127.0.0.1", portnum); return network->parseRemoteAddress("127.0.0.1", portnum);
}).then([&](Own<RemoteAddress>&& result) { }).then([&](Own<RemoteAddress>&& result) {
return result->connect(); return result->connect();
}).then([&](Own<AsyncIoStream>&& result) { }).then([&](Own<AsyncIoStream>&& result) {
client = kj::mv(result); client = kj::mv(result);
return client->write("foo", 3); return client->write("foo", 3);
}), [](kj::Exception&& exception) { }).daemonize([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr(); ADD_FAILURE() << kj::str(exception).cStr();
}); });
...@@ -97,7 +97,7 @@ TEST(AsyncIo, OneWayPipe) { ...@@ -97,7 +97,7 @@ TEST(AsyncIo, OneWayPipe) {
auto pipe = newOneWayPipe(); auto pipe = newOneWayPipe();
char receiveBuffer[4]; char receiveBuffer[4];
daemonize(pipe.out->write("foo", 3), [](kj::Exception&& exception) { pipe.out->write("foo", 3).daemonize([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr(); ADD_FAILURE() << kj::str(exception).cStr();
}); });
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// This file contains a bunch of internal declarations that must appear before async.h can start.
// We don't define these directly in async.h because it makes the file hard to read.
#ifndef KJ_ASYNC_PRELUDE_H_
#define KJ_ASYNC_PRELUDE_H_
#include "exception.h"
namespace kj {
class EventLoop;
template <typename T>
class Promise;
namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; };
template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
// arbitrary type which simply propagates the exception.
public:
class Bottom {
public:
Bottom(Exception&& exception): exception(kj::mv(exception)) {}
Exception asException() { return kj::mv(exception); }
private:
Exception exception;
};
Bottom operator()(Exception&& e) {
return Bottom(kj::mv(e));
}
Bottom operator()(const Exception& e) {
return Bottom(kj::cp(e));
}
};
template <typename Func, typename T>
struct ReturnType_ { typedef decltype(instance<Func>()(instance<T>())) Type; };
template <typename Func>
struct ReturnType_<Func, void> { typedef decltype(instance<Func>()()) Type; };
template <typename Func, typename T>
using ReturnType = typename ReturnType_<Func, T>::Type;
// The return type of functor Func given a parameter of type T, with the special exception that if
// T is void, this is the return type of Func called with no arguments.
struct Void {};
// Application code should NOT refer to this! See `kj::READY_NOW` instead.
template <typename T> struct FixVoid_ { typedef T Type; };
template <> struct FixVoid_<void> { typedef Void Type; };
template <typename T> using FixVoid = typename FixVoid_<T>::Type;
// FixVoid<T> is just T unless T is void in which case it is _::Void (an empty struct).
template <typename T> struct UnfixVoid_ { typedef T Type; };
template <> struct UnfixVoid_<Void> { typedef void Type; };
template <typename T> using UnfixVoid = typename UnfixVoid_<T>::Type;
// UnfixVoid is the opposite of FixVoid.
template <typename In, typename Out>
struct MaybeVoidCaller {
// Calls the function converting a Void input to an empty parameter list and a void return
// value to a Void output.
template <typename Func>
static inline Out apply(Func& func, In&& in) {
return func(kj::mv(in));
}
};
template <typename In, typename Out>
struct MaybeVoidCaller<In&, Out> {
template <typename Func>
static inline Out apply(Func& func, In& in) {
return func(in);
}
};
template <typename Out>
struct MaybeVoidCaller<Void, Out> {
template <typename Func>
static inline Out apply(Func& func, Void&& in) {
return func();
}
};
template <typename In>
struct MaybeVoidCaller<In, Void> {
template <typename Func>
static inline Void apply(Func& func, In&& in) {
func(kj::mv(in));
return Void();
}
};
template <typename In>
struct MaybeVoidCaller<In&, Void> {
template <typename Func>
static inline Void apply(Func& func, In& in) {
func(in);
return Void();
}
};
template <>
struct MaybeVoidCaller<Void, Void> {
template <typename Func>
static inline Void apply(Func& func, Void&& in) {
func();
return Void();
}
};
template <typename T>
inline T&& returnMaybeVoid(T&& t) {
return kj::fwd<T>(t);
}
inline void returnMaybeVoid(Void&& v) {}
class ExceptionOrValue;
class PromiseNode;
class ChainPromiseNode;
template <typename T>
class ForkHub;
class TaskSetImpl;
class Event;
class PromiseBase {
public:
kj::String trace();
// Dump debug info about this promise.
private:
Own<PromiseNode> node;
PromiseBase() = default;
PromiseBase(Own<PromiseNode>&& node): node(kj::mv(node)) {}
friend class kj::EventLoop;
friend class ChainPromiseNode;
template <typename>
friend class kj::Promise;
friend class TaskSetImpl;
};
} // namespace _ (private)
} // namespace kj
#endif // KJ_ASYNC_PRELUDE_H_
...@@ -194,7 +194,7 @@ TEST(Async, SeparateFulfillerCanceled) { ...@@ -194,7 +194,7 @@ TEST(Async, SeparateFulfillerCanceled) {
auto pair = newPromiseAndFulfiller<void>(); auto pair = newPromiseAndFulfiller<void>();
EXPECT_TRUE(pair.fulfiller->isWaiting()); EXPECT_TRUE(pair.fulfiller->isWaiting());
pair.promise.absolve(); pair.promise = nullptr;
EXPECT_FALSE(pair.fulfiller->isWaiting()); EXPECT_FALSE(pair.fulfiller->isWaiting());
} }
...@@ -473,8 +473,8 @@ TEST(Async, Daemonize) { ...@@ -473,8 +473,8 @@ TEST(Async, Daemonize) {
bool ran3 = false; bool ran3 = false;
evalLater([&]() { ran1 = true; }); evalLater([&]() { ran1 = true; });
daemonize(evalLater([&]() { ran2 = true; }), [](kj::Exception&&) { ADD_FAILURE(); }); evalLater([&]() { ran2 = true; }).daemonize([](kj::Exception&&) { ADD_FAILURE(); });
daemonize(evalLater([]() { KJ_FAIL_ASSERT("foo"); }), [&](kj::Exception&& e) { ran3 = true; }); evalLater([]() { KJ_FAIL_ASSERT("foo"); }).daemonize([&](kj::Exception&& e) { ran3 = true; });
EXPECT_FALSE(ran1); EXPECT_FALSE(ran1);
EXPECT_FALSE(ran2); EXPECT_FALSE(ran2);
......
...@@ -84,9 +84,9 @@ TEST_F(AsyncUnixTest, SignalWithValue) { ...@@ -84,9 +84,9 @@ TEST_F(AsyncUnixTest, SignalWithValue) {
TEST_F(AsyncUnixTest, SignalsMultiListen) { TEST_F(AsyncUnixTest, SignalsMultiListen) {
UnixEventLoop loop; UnixEventLoop loop;
daemonize(loop.onSignal(SIGIO).then([](siginfo_t&&) { loop.onSignal(SIGIO).then([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal."; ADD_FAILURE() << "Received wrong signal.";
}), [](kj::Exception&& exception) { }).daemonize([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr(); ADD_FAILURE() << kj::str(exception).cStr();
}); });
...@@ -147,10 +147,10 @@ TEST_F(AsyncUnixTest, PollMultiListen) { ...@@ -147,10 +147,10 @@ TEST_F(AsyncUnixTest, PollMultiListen) {
KJ_SYSCALL(pipe(bogusPipefds)); KJ_SYSCALL(pipe(bogusPipefds));
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); }); KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
daemonize(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) { loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) {
KJ_DBG(s); KJ_DBG(s);
ADD_FAILURE() << "Received wrong poll."; ADD_FAILURE() << "Received wrong poll.";
}), [](kj::Exception&& exception) { }).daemonize([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr(); ADD_FAILURE() << kj::str(exception).cStr();
}); });
......
...@@ -67,6 +67,16 @@ public: ...@@ -67,6 +67,16 @@ public:
} }
}; };
class NeverReadyPromiseNode final: public _::PromiseNode {
public:
bool onReady(_::Event& event) noexcept override {
return false;
}
void get(_::ExceptionOrValue& output) noexcept override {
KJ_FAIL_REQUIRE("Not ready.");
}
};
} // namespace } // namespace
namespace _ { // private namespace _ { // private
...@@ -210,6 +220,12 @@ EventLoop::~EventLoop() noexcept(false) { ...@@ -210,6 +220,12 @@ EventLoop::~EventLoop() noexcept(false) {
} }
} }
void EventLoop::runForever() {
_::ExceptionOr<_::Void> result;
waitImpl(kj::heap<NeverReadyPromiseNode>(), result);
KJ_UNREACHABLE;
}
void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result) { void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result) {
KJ_REQUIRE(threadLocalEventLoop == this, KJ_REQUIRE(threadLocalEventLoop == this,
"Can only call wait() in the thread that created this EventLoop."); "Can only call wait() in the thread that created this EventLoop.");
...@@ -452,14 +468,6 @@ void SimpleEventLoop::wake() const { ...@@ -452,14 +468,6 @@ void SimpleEventLoop::wake() const {
// ======================================================================================= // =======================================================================================
void PromiseBase::absolve() {
runCatchingExceptions([this]() { node = nullptr; });
}
kj::String PromiseBase::trace() {
return traceImpl(nullptr, node);
}
TaskSet::TaskSet(ErrorHandler& errorHandler) TaskSet::TaskSet(ErrorHandler& errorHandler)
: impl(heap<_::TaskSetImpl>(errorHandler)) {} : impl(heap<_::TaskSetImpl>(errorHandler)) {}
...@@ -475,6 +483,10 @@ kj::String TaskSet::trace() { ...@@ -475,6 +483,10 @@ kj::String TaskSet::trace() {
namespace _ { // private namespace _ { // private
kj::String PromiseBase::trace() {
return traceImpl(nullptr, node);
}
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; } PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
bool PromiseNode::OnReadyEvent::init(Event& newEvent) { bool PromiseNode::OnReadyEvent::init(Event& newEvent) {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#ifndef KJ_ASYNC_H_ #ifndef KJ_ASYNC_H_
#define KJ_ASYNC_H_ #define KJ_ASYNC_H_
#include "async-prelude.h"
#include "exception.h" #include "exception.h"
#include "mutex.h" #include "mutex.h"
#include "refcount.h" #include "refcount.h"
...@@ -43,311 +44,26 @@ class PromiseFulfiller; ...@@ -43,311 +44,26 @@ class PromiseFulfiller;
template <typename T> template <typename T>
struct PromiseFulfillerPair; struct PromiseFulfillerPair;
// =======================================================================================
// ***************************************************************************************
// This section contains various internal stuff that needs to be declared upfront.
// Scroll down to `class EventLoop` or `class Promise` for the public interfaces.
// ***************************************************************************************
// =======================================================================================
namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; };
template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
// arbitrary type which simply propagates the exception.
public:
class Bottom {
public:
Bottom(Exception&& exception): exception(kj::mv(exception)) {}
Exception asException() { return kj::mv(exception); }
private:
Exception exception;
};
Bottom operator()(Exception&& e) {
return Bottom(kj::mv(e));
}
Bottom operator()(const Exception& e) {
return Bottom(kj::cp(e));
}
};
template <typename Func, typename T>
struct ReturnType_ { typedef decltype(instance<Func>()(instance<T>())) Type; };
template <typename Func>
struct ReturnType_<Func, void> { typedef decltype(instance<Func>()()) Type; };
template <typename Func, typename T>
using ReturnType = typename ReturnType_<Func, T>::Type;
// The return type of functor Func given a parameter of type T, with the special exception that if
// T is void, this is the return type of Func called with no arguments.
struct Void {};
// Application code should NOT refer to this! See `kj::READY_NOW` instead.
template <typename T> struct FixVoid_ { typedef T Type; };
template <> struct FixVoid_<void> { typedef Void Type; };
template <typename T> using FixVoid = typename FixVoid_<T>::Type;
// FixVoid<T> is just T unless T is void in which case it is _::Void (an empty struct).
template <typename T> struct UnfixVoid_ { typedef T Type; };
template <> struct UnfixVoid_<Void> { typedef void Type; };
template <typename T> using UnfixVoid = typename UnfixVoid_<T>::Type;
// UnfixVoid is the opposite of FixVoid.
template <typename In, typename Out>
struct MaybeVoidCaller {
// Calls the function converting a Void input to an empty parameter list and a void return
// value to a Void output.
template <typename Func>
static inline Out apply(Func& func, In&& in) {
return func(kj::mv(in));
}
};
template <typename In, typename Out>
struct MaybeVoidCaller<In&, Out> {
template <typename Func>
static inline Out apply(Func& func, In& in) {
return func(in);
}
};
template <typename Out>
struct MaybeVoidCaller<Void, Out> {
template <typename Func>
static inline Out apply(Func& func, Void&& in) {
return func();
}
};
template <typename In>
struct MaybeVoidCaller<In, Void> {
template <typename Func>
static inline Void apply(Func& func, In&& in) {
func(kj::mv(in));
return Void();
}
};
template <typename In>
struct MaybeVoidCaller<In&, Void> {
template <typename Func>
static inline Void apply(Func& func, In& in) {
func(in);
return Void();
}
};
template <>
struct MaybeVoidCaller<Void, Void> {
template <typename Func>
static inline Void apply(Func& func, Void&& in) {
func();
return Void();
}
};
template <typename T>
inline T&& returnMaybeVoid(T&& t) {
return kj::fwd<T>(t);
}
inline void returnMaybeVoid(Void&& v) {}
class ExceptionOrValue;
class PromiseNode;
class ChainPromiseNode;
template <typename T>
class ForkHub;
class TaskSetImpl;
class Event;
} // namespace _ (private)
// =======================================================================================
// ***************************************************************************************
// User-relevant interfaces start here.
// ***************************************************************************************
// =======================================================================================
template <typename Func, typename T> template <typename Func, typename T>
using PromiseForResult = Promise<_::JoinPromises<_::ReturnType<Func, T>>>; using PromiseForResult = Promise<_::JoinPromises<_::ReturnType<Func, T>>>;
// Evaluates to the type of Promise for the result of calling functor type Func with parameter type // Evaluates to the type of Promise for the result of calling functor type Func with parameter type
// T. If T is void, then the promise is for the result of calling Func with no arguments. If // T. If T is void, then the promise is for the result of calling Func with no arguments. If
// Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>. // Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
class EventLoop { // =======================================================================================
// Represents a queue of events being executed in a loop. Most code won't interact with // Promises
// EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
// documentation for `Promise`.
//
// Each thread can have at most one EventLoop. When an EventLoop is created, it becomes the
// default loop for the current thread. Async APIs require that the thread has a current
// EventLoop, or they will throw exceptions.
//
// Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
// in the main() function, or in the start function of a thread. You can then use it to
// construct some promises and wait on the result. Example:
//
// int main() {
// // `loop` becomes the official EventLoop for the thread.
// SimpleEventLoop loop;
//
// // Now we can call an async function.
// Promise<String> textPromise = getHttp("http://example.com");
//
// // And we can wait for the promise to complete. Note that you can only use `wait()`
// // from the top level, not from inside a promise callback.
// String text = textPromise.wait();
// print(text);
// return 0;
// }
class EventJob;
public:
EventLoop();
~EventLoop() noexcept(false);
static EventLoop& current();
// Get the event loop for the current thread. Throws an exception if no event loop is active.
bool isCurrent() const;
// Is this EventLoop the current one for this thread? This can safely be called from any thread.
void runForever() KJ_NORETURN;
// Runs the loop forever. Useful for servers.
protected:
// -----------------------------------------------------------------
// Subclasses should implement these.
virtual void prepareToSleep() noexcept = 0;
// Called just before `sleep()`. After calling this, the caller checks if any events are
// scheduled. If so, it calls `wake()`. Then, whether or not events were scheduled, it calls
// `sleep()`. Thus, `prepareToSleep()` is always followed by exactly one call to `sleep()`.
virtual void sleep() = 0;
// Do not return until `wake()` is called. Always preceded by a call to `prepareToSleep()`.
virtual void wake() const = 0;
// Cancel any calls to sleep() that occurred *after* the last call to `prepareToSleep()`.
// May be called from a different thread. The interaction with `prepareToSleep()` is important:
// a `wake()` may occur between a call to `prepareToSleep()` and `sleep()`, in which case
// the subsequent `sleep()` must return immediately. `wake()` may be called any time an event
// is armed; it should return quickly if the loop isn't prepared to sleep.
private:
bool running = false;
// True while looping -- wait() is then not allowed.
_::Event* head = nullptr;
_::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head;
Own<_::TaskSetImpl> daemons;
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thenImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result);
// Run the event loop until `node` is fulfilled, and then `get()` its result into `result`.
Promise<void> yield();
// Returns a promise that won't resolve until all events currently on the queue are fired.
// Otherwise, returns an already-resolved promise. Used to implement evalLater().
template <typename T>
T wait(Promise<T>&& promise);
template <typename Func>
PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
void daemonize(kj::Promise<void>&& promise);
template <typename>
friend class Promise;
friend Promise<void> yield();
template <typename ErrorFunc>
friend void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler);
template <typename Func>
friend PromiseForResult<Func, void> evalLater(Func&& func);
friend class _::Event;
};
// -------------------------------------------------------------------
class SimpleEventLoop final: public EventLoop {
// A simple EventLoop implementation that does not know how to wait for any external I/O.
public:
SimpleEventLoop();
~SimpleEventLoop() noexcept(false);
protected:
void prepareToSleep() noexcept override;
void sleep() override;
void wake() const override;
private:
mutable int preparedToSleep = 0;
#if !KJ_USE_FUTEX
mutable pthread_mutex_t mutex;
mutable pthread_cond_t condvar;
#endif
};
// -------------------------------------------------------------------
class PromiseBase {
public:
PromiseBase(PromiseBase&&) = default;
PromiseBase& operator=(PromiseBase&&) = default;
inline ~PromiseBase() { absolve(); }
void absolve();
// Explicitly cancel the async operation and free all related local data structures. This is
// called automatically by Promise's destructor, but is sometimes useful to call explicitly.
//
// Any exceptions thrown by destructors of objects that were handling the async operation will
// be caught and discarded, on the assumption that such exceptions are a side-effect of deleting
// these structures while they were in the middle of doing something. Presumably, you do not
// care. In contrast, if you were to call `then()` or `wait()`, such exceptions would be caught
// and propagated.
kj::String trace();
// Dump debug info about this promise.
private:
Own<_::PromiseNode> node;
PromiseBase() = default;
PromiseBase(Own<_::PromiseNode>&& node): node(kj::mv(node)) {}
friend class EventLoop;
friend class _::ChainPromiseNode;
template <typename>
friend class Promise;
friend class _::TaskSetImpl;
};
template <typename T> template <typename T>
class Promise: public PromiseBase { class Promise: protected _::PromiseBase {
// The basic primitive of asynchronous computation in KJ. Similar to "futures", but more // The basic primitive of asynchronous computation in KJ. Similar to "futures", but designed
// powerful. Similar to E promises and JavaScript Promises/A. // specifically for event loop concurrency. Similar to E promises and JavaScript Promises/A.
// //
// A Promise represents a promise to produce a value of type T some time in the future. Once // A Promise represents a promise to produce a value of type T some time in the future. Once
// that value has been produced, the promise is "fulfilled". Alternatively, a promise can be // that value has been produced, the promise is "fulfilled". Alternatively, a promise can be
// "broken", with an Exception describing what went wrong. You may implicitly convert a value of // "broken", with an Exception describing what went wrong. You may implicitly convert a value of
// type T to an already-fulfilled Promise<T>. You may implicitly convert the constant // type T to an already-fulfilled Promise<T>. You may implicitly convert the constant
// `kj::READY_NOW` to an already-fulfilled Promise<void>. // `kj::READY_NOW` to an already-fulfilled Promise<void>. You may also implicitly convert a
// `kj::Exception` to an already-broken promise of any type.
// //
// Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed // Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed
// or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations // or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
...@@ -376,12 +92,11 @@ class Promise: public PromiseBase { ...@@ -376,12 +92,11 @@ class Promise: public PromiseBase {
// return count; // return count;
// }); // });
// //
// For `then()` to work, the current thread must be looping in an `EventLoop`. Each callback // For `then()` to work, the current thread must have an active `EventLoop`. Each callback
// is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current // is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current
// thread's event loop, you do not need to worry about two callbacks running at the same time. // thread's event loop, you do not need to worry about two callbacks running at the same time.
// If you explicitly _want_ a callback to run on some other thread, you can schedule it there // You will need to set up at least one `EventLoop` at the top level of your program before you
// using the `EventLoop` interface. You will need to set up at least one `EventLoop` at the top // can use promises.
// level of your program before you can use promises.
// //
// To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`. // To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`.
// //
...@@ -392,7 +107,9 @@ class Promise: public PromiseBase { ...@@ -392,7 +107,9 @@ class Promise: public PromiseBase {
// of calls. It is suggested that any class T which supports pipelining implement a subclass of // of calls. It is suggested that any class T which supports pipelining implement a subclass of
// Promise<T> which adds "eventual send" methods -- methods which, when called, say "please // Promise<T> which adds "eventual send" methods -- methods which, when called, say "please
// invoke the corresponding method on the promised value once it is available". These methods // invoke the corresponding method on the promised value once it is available". These methods
// should in turn return promises for the eventual results of said invocations. // should in turn return promises for the eventual results of said invocations. Cap'n Proto,
// for example, implements the type `RemotePromise` which supports pipelining RPC requests -- see
// `capnp/capability.h`.
// //
// KJ Promises are based on E promises: // KJ Promises are based on E promises:
// http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises // http://wiki.erights.org/wiki/Walnut/Distributed_Computing#Promises
...@@ -429,10 +146,7 @@ public: ...@@ -429,10 +146,7 @@ public:
// continuation is always executed on the same EventLoop (and, therefore, the same thread) which // continuation is always executed on the same EventLoop (and, therefore, the same thread) which
// called `then()`, therefore no synchronization is necessary on state shared by the continuation // called `then()`, therefore no synchronization is necessary on state shared by the continuation
// and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws // and the surrounding scope. If no EventLoop is running on the current thread, `then()` throws
// an exception; in this case you will have to find an explicit EventLoop instance and use // an exception.
// its `there()` method to schedule the continuation to occur in that loop.
// `promise.then(...)` is mostly-equivalent to `EventLoop::current().there(kj::mv(promise), ...)`,
// except for some scheduling differences described below.
// //
// You may also specify an error handler continuation as the second parameter. `errorHandler` // You may also specify an error handler continuation as the second parameter. `errorHandler`
// must be a functor taking a parameter of type `kj::Exception&&`. It must return the same // must be a functor taking a parameter of type `kj::Exception&&`. It must return the same
...@@ -441,33 +155,26 @@ public: ...@@ -441,33 +155,26 @@ public:
// exception to the returned promise. // exception to the returned promise.
// //
// Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise // Either `func` or `errorHandler` may, of course, throw an exception, in which case the promise
// is broken. When compiled with -fno-exceptions, the framework will detect when a non-fatal // is broken. When compiled with -fno-exceptions, the framework will still detect when a
// exception was thrown inside of a continuation and will consider the promise broken even though // recoverable exception was thrown inside of a continuation and will consider the promise
// a (presumably garbage) result was returned. // broken even though a (presumably garbage) result was returned.
//
// If the returned promise is destroyed before the callback runs, the callback will be canceled
// (it will never run).
// //
// Note that `then()` consumes the promise on which it is called, in the sense of move semantics. // Note that `then()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid, but `then()` returns a new promise. // After returning, the original promise is no longer valid, but `then()` returns a new promise.
// If we were targetting GCC 4.8 / Clang 3.3, this method would be rvalue-qualified; we may
// change it to be so in the future.
//
// If the returned promise is destroyed before the callback runs, the callback will be canceled.
// If the returned promise is destroyed while the callback is running in another thread, the
// destructor will block until the callback completes. Additionally, canceling the returned
// promise will transitively cancel the input promise, if it hasn't already completed. Or, if
// `func()` already ran and returned another promise, then canceling the returned promise
// transitively cancels that promise. In short, once a Promise's destructor completes, you can
// assume that any asynchronous operation it was performing has ceased (at least, locally; stuff
// may still be happening on some remote machine).
// //
// *Advanced implementation tips:* Most users will never need to worry about the below, but // *Advanced implementation tips:* Most users will never need to worry about the below, but
// it is good to be aware of. // it is good to be aware of.
// //
// As an optimization, if the callback function `func` does _not_ return another promise, then // As an optimization, if the callback function `func` does _not_ return another promise, then
// execution of `func` itself may be delayed until its result is known to be needed. The // execution of `func` itself may be delayed until its result is known to be needed. The
// here expectation is that `func` is just doing some transformation on the results, not // expectation here is that `func` is just doing some transformation on the results, not
// scheduling any other actions, therefore the system doesn't need to be proactive about // scheduling any other actions, therefore the system doesn't need to be proactive about
// evaluating it. This way, a chain of trivial then() transformations can be executed all at // evaluating it. This way, a chain of trivial then() transformations can be executed all at
// once without repeatedly re-scheduling through the event loop. // once without repeatedly re-scheduling through the event loop. Use the `eagerlyEvaluate()`
// method to suppress this behavior.
// //
// On the other hand, if `func` _does_ return another promise, then the system evaluates `func` // On the other hand, if `func` _does_ return another promise, then the system evaluates `func`
// as soon as possible, because the promise it returns might be for a newly-scheduled // as soon as possible, because the promise it returns might be for a newly-scheduled
...@@ -478,17 +185,8 @@ public: ...@@ -478,17 +185,8 @@ public:
// This allows a long chain of `then`s to execute all at once, improving cache locality by // This allows a long chain of `then`s to execute all at once, improving cache locality by
// clustering operations on the same data. However, this implies that starvation can occur // clustering operations on the same data. However, this implies that starvation can occur
// if a chain of `then()`s takes a very long time to execute without ever stopping to wait for // if a chain of `then()`s takes a very long time to execute without ever stopping to wait for
// actual I/O. To solve this, use `EventLoop::current()`'s `evalLater()` method to yield // actual I/O. To solve this, use `kj::evalLater()` to yield control; this way, all other events
// control; this way, all other events in the queue will get a chance to run before your callback // in the queue will get a chance to run before your callback is executed.
// is executed.
//
// `EventLoop::there()` behaves like `then()` when called on the current thread's EventLoop.
// However, when used to schedule work on some other thread's loop, `there()` does _not_ schedule
// preemptively as this would make the ordering of events unpredictable. Also, again only when
// scheduling cross-thread, `there()` will always evaluate the continuation eagerly even if
// nothing is waiting on the returned promise, on the assumption that it is being used to
// schedule a long-running computation in another thread. In other words, when scheduling
// cross-thread, both of the "optimizations" described above are avoided.
T wait(); T wait();
// Run the event loop until the promise is fulfilled, then return its result. If the promise // Run the event loop until the promise is fulfilled, then return its result. If the promise
...@@ -509,6 +207,9 @@ public: ...@@ -509,6 +207,9 @@ public:
// use `then()` to set an appropriate handler for the exception case, so that the promise you // use `then()` to set an appropriate handler for the exception case, so that the promise you
// actually wait on never throws. // actually wait on never throws.
// //
// Note that `wait()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an // TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event. // event.
...@@ -517,6 +218,9 @@ public: ...@@ -517,6 +218,9 @@ public:
// `T` must be copy-constructable for this to work. Or, in the special case where `T` is // `T` must be copy-constructable for this to work. Or, in the special case where `T` is
// `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
// (or an equivalent) object (probably implemented via reference counting). // (or an equivalent) object (probably implemented via reference counting).
//
// Note that `fork()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid.
void exclusiveJoin(Promise<T>&& other); void exclusiveJoin(Promise<T>&& other);
// Replace this promise with one that resolves when either the original promise resolves or // Replace this promise with one that resolves when either the original promise resolves or
...@@ -537,6 +241,25 @@ public: ...@@ -537,6 +241,25 @@ public:
// for awhile without consuming the result, but you want to make sure that the system actually // for awhile without consuming the result, but you want to make sure that the system actually
// processes it. // processes it.
template <typename ErrorFunc>
void daemonize(ErrorFunc&& errorHandler);
// Allows the promise to continue running in the background until it completes or the
// `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
// promise, you need to make sure that the promise owns all the objects it touches or make sure
// those objects outlive the EventLoop.
//
// `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
// `then()`, except that it must return void.
//
// This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
// canceled unless the callee explicitly permits it.
//
// Note that `daemonize()` consumes the promise on which it is called, in the sense of move
// semantics. After returning, the original promise is no longer valid.
kj::String trace();
// Returns a dump of debug info about this promise. Not for production use. Requires RTTI.
private: private:
Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {} Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {}
// Second parameter prevent ambiguity with immediate-value constructor. // Second parameter prevent ambiguity with immediate-value constructor.
...@@ -550,6 +273,7 @@ private: ...@@ -550,6 +273,7 @@ private:
friend PromiseFulfillerPair<U> newPromiseAndFulfiller(); friend PromiseFulfillerPair<U> newPromiseAndFulfiller();
template <typename> template <typename>
friend class _::ForkHub; friend class _::ForkHub;
friend class _::TaskSetImpl;
}; };
template <typename T> template <typename T>
...@@ -572,39 +296,6 @@ private: ...@@ -572,39 +296,6 @@ private:
friend class EventLoop; friend class EventLoop;
}; };
class TaskSet {
// Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
// associated with each promise is automatically freed when the promise completes. Destroying
// the TaskSet itself automatically cancels all unfinished promises.
//
// This is useful for "daemon" objects that perform background tasks which aren't intended to
// fulfill any particular external promise. The daemon object holds a TaskSet to collect these
// tasks it is working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed
// as well, and everything the daemon is doing is canceled. (The only alternative -- creating
// a promise that owns itself and deletes itself on completion -- does not allow for clean
// shutdown.)
public:
class ErrorHandler {
public:
virtual void taskFailed(kj::Exception&& exception) = 0;
};
TaskSet(ErrorHandler& errorHandler);
// `loop` will be used to wait on promises. `errorHandler` will be executed any time a task
// throws an exception, and will execute within the given EventLoop.
~TaskSet() noexcept(false);
void add(Promise<void>&& promise);
kj::String trace();
// Return debug info about all promises currently in the TaskSet.
private:
Own<_::TaskSetImpl> impl;
};
constexpr _::Void READY_NOW = _::Void(); constexpr _::Void READY_NOW = _::Void();
// Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly // Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
// cast to `Promise<void>`. // cast to `Promise<void>`.
...@@ -623,20 +314,7 @@ PromiseForResult<Func, void> evalLater(Func&& func); ...@@ -623,20 +314,7 @@ PromiseForResult<Func, void> evalLater(Func&& func);
// //
// If you schedule several evaluations with `evalLater`, they will be executed in order. // If you schedule several evaluations with `evalLater`, they will be executed in order.
template <typename ErrorFunc> // =======================================================================================
void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler);
// Allows the given promise to continue running in the background until it completes or the
// `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
// promise, you need to make sure that the promise owns all the objects it touches or make sure
// those objects outlive the EventLoop.
//
// `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to `then()`,
// except that it must return void.
//
// This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
// canceled unless the callee explicitly permits it.
// -------------------------------------------------------------------
// Hack for creating a lambda that holds an owned pointer. // Hack for creating a lambda that holds an owned pointer.
template <typename Func, typename MovedParam> template <typename Func, typename MovedParam>
...@@ -671,7 +349,7 @@ inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func ...@@ -671,7 +349,7 @@ inline CaptureByMove<Func, Decay<MovedParam>> mvCapture(MovedParam&& param, Func
return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param)); return CaptureByMove<Func, Decay<MovedParam>>(kj::fwd<Func>(func), kj::mv(param));
} }
// ------------------------------------------------------------------- // =======================================================================================
// Advanced promise construction // Advanced promise construction
template <typename T> template <typename T>
...@@ -701,9 +379,6 @@ public: ...@@ -701,9 +379,6 @@ public:
template <> template <>
class PromiseFulfiller<void> { class PromiseFulfiller<void> {
// Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>. // Specialization of PromiseFulfiller for void promises. See PromiseFulfiller<T>.
//
// It is safe to call a PromiseFulfiller from any thread, as long as you only call it from one
// thread at a time.
public: public:
virtual void fulfill(_::Void&& value = _::Void()) = 0; virtual void fulfill(_::Void&& value = _::Void()) = 0;
...@@ -732,8 +407,7 @@ Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams); ...@@ -732,8 +407,7 @@ Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams);
// The adapter is destroyed when its owning Promise is destroyed. This may occur before the // The adapter is destroyed when its owning Promise is destroyed. This may occur before the
// Promise has been fulfilled. In this case, the adapter's destructor should cancel the // Promise has been fulfilled. In this case, the adapter's destructor should cancel the
// asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be // asynchronous operation. Once the adapter is destroyed, the fulfillment callback cannot be
// called. If the callback may already be in progress in another thread, then the destructor // called.
// must block until the callback returns.
// //
// An adapter implementation should be carefully written to ensure that it cannot accidentally // An adapter implementation should be carefully written to ensure that it cannot accidentally
// be left unfulfilled permanently because of an exception. Consider making liberal use of // be left unfulfilled permanently because of an exception. Consider making liberal use of
...@@ -760,754 +434,166 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller(); ...@@ -760,754 +434,166 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller();
// `fulfill()` callback, and the promises are chained. // `fulfill()` callback, and the promises are chained.
// ======================================================================================= // =======================================================================================
// internal implementation details follow // TaskSet
namespace _ { // private class TaskSet {
// Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
template <typename T> // associated with each promise is automatically freed when the promise completes. Destroying
class ExceptionOr; // the TaskSet itself automatically cancels all unfinished promises.
//
// This is useful for "daemon" objects that perform background tasks which aren't intended to
// fulfill any particular external promise, but which may need to be canceled (and thus can't
// use `Promise::daemonize()`). The daemon object holds a TaskSet to collect these tasks it is
// working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed as well,
// and everything the daemon is doing is canceled.
class ExceptionOrValue {
public: public:
ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {} class ErrorHandler {
KJ_DISALLOW_COPY(ExceptionOrValue); public:
virtual void taskFailed(kj::Exception&& exception) = 0;
};
void addException(Exception&& exception) { TaskSet(ErrorHandler& errorHandler);
if (this->exception == nullptr) { // `loop` will be used to wait on promises. `errorHandler` will be executed any time a task
this->exception = kj::mv(exception); // throws an exception, and will execute within the given EventLoop.
}
}
template <typename T>
ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
template <typename T>
const ExceptionOr<T>& as() const { return *static_cast<const ExceptionOr<T>*>(this); }
Maybe<Exception> exception; ~TaskSet() noexcept(false);
protected: void add(Promise<void>&& promise);
// Allow subclasses to have move constructor / assignment.
ExceptionOrValue() = default;
ExceptionOrValue(ExceptionOrValue&& other) = default;
ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
};
template <typename T> kj::String trace();
class ExceptionOr: public ExceptionOrValue { // Return debug info about all promises currently in the TaskSet.
public:
ExceptionOr() = default;
ExceptionOr(T&& value): value(kj::mv(value)) {}
ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
ExceptionOr(ExceptionOr&&) = default;
ExceptionOr& operator=(ExceptionOr&&) = default;
Maybe<T> value; private:
Own<_::TaskSetImpl> impl;
}; };
class Event { // =======================================================================================
// An event waiting to be executed. Not for direct use by applications -- promises use this // The EventLoop class
// internally.
public:
Event();
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void armDepthFirst(); class EventLoop {
// Enqueue this event so that `fire()` will be called from the event loop soon. // Represents a queue of events being executed in a loop. Most code won't interact with
// EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
// documentation for `Promise`.
// //
// Events scheduled in this way are executed in depth-first order: if an event callback arms // Each thread can have at most one EventLoop. When an EventLoop is created, it becomes the
// more events, those events are placed at the front of the queue (in the order in which they // default loop for the current thread. Async APIs require that the thread has a current
// were armed), so that they run immediately after the first event's callback returns. // EventLoop, or they will throw exceptions.
// //
// Depth-first event scheduling is appropriate for events that represent simple continuations // Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
// of a previous event that should be globbed together for performance. Depth-first scheduling // in the main() function, or in the start function of a thread. You can then use it to
// can lead to starvation, so any long-running task must occasionally yield with // construct some promises and wait on the result. Example:
// `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
// breadth-first.)
// //
// To use breadth-first scheduling instead, use `armLater()`. // int main() {
// // `loop` becomes the official EventLoop for the thread.
void armBreadthFirst(); // SimpleEventLoop loop;
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
kj::String trace();
// Dump debug info about this event.
virtual _::PromiseNode* getInnerForTrace();
// If this event wraps a PromiseNode, get that node. Used for debug tracing.
// Default implementation returns nullptr.
protected:
virtual Maybe<Own<Event>> fire() = 0;
// Fire the event. Possibly returns a pointer to itself, which will be discarded by the
// caller. This is the only way that an event can delete itself as a result of firing, as
// doing so from within fire() will throw an exception.
private:
friend class kj::EventLoop;
EventLoop& loop;
Event* next;
Event** prev;
bool firing = false;
};
class PromiseNode {
// A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
// //
// To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky // // Now we can call an async function.
// use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only // Promise<String> textPromise = getHttp("http://example.com");
// so down-cast in the few places that really need to be templated. Luckily this is all //
// internal implementation details. // // And we can wait for the promise to complete. Note that you can only use `wait()`
// // from the top level, not from inside a promise callback.
public: // String text = textPromise.wait();
virtual bool onReady(Event& event) noexcept = 0; // print(text);
// Returns true if already ready, otherwise arms the given event when ready. // return 0;
// }
virtual void get(ExceptionOrValue& output) noexcept = 0;
// Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
// Can only be called once, and only after the node is ready. Must be called directly from the
// event loop, with no application code on the stack.
virtual PromiseNode* getInnerForTrace();
// If this node wraps some other PromiseNode, get the wrapped node. Used for debug tracing.
// Default implementation returns nullptr.
protected:
class OnReadyEvent {
// Helper class for implementing onReady().
public:
bool init(Event& newEvent);
// Returns true if arm() was already called.
void arm();
// Arms the event if init() has already been called and makes future calls to init() return
// true.
private:
Event* event = nullptr;
};
};
// -------------------------------------------------------------------
class ImmediatePromiseNodeBase: public PromiseNode {
public:
bool onReady(Event& event) noexcept override;
};
template <typename T>
class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
// A promise that has already been resolved to an immediate value or exception.
public:
ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
public:
ImmediateBrokenPromiseNode(Exception&& exception);
void get(ExceptionOrValue& output) noexcept override;
private:
Exception exception;
};
// -------------------------------------------------------------------
class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
void dropDependency();
template <typename>
friend class AttachmentPromiseNode;
};
template <typename Attachment>
class AttachmentPromiseNode final: public AttachmentPromiseNodeBase {
// A PromiseNode that holds on to some object (usually, an Own<T>, but could be any movable
// object) until the promise resolves.
public:
AttachmentPromiseNode(Own<PromiseNode>&& dependency, Attachment&& attachment)
: AttachmentPromiseNodeBase(kj::mv(dependency)),
attachment(kj::mv<Attachment>(attachment)) {}
~AttachmentPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the attachment because the
// dependency may be using the attachment.
dropDependency();
}
private:
Attachment attachment;
};
// -------------------------------------------------------------------
class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
void dropDependency();
void getDepResult(ExceptionOrValue& output);
virtual void getImpl(ExceptionOrValue& output) = 0;
template <typename, typename, typename, typename>
friend class TransformPromiseNode;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc> class EventJob;
class TransformPromiseNode final: public TransformPromiseNodeBase {
// A PromiseNode that transforms the result of another PromiseNode through an application-provided
// function (implements `then()`).
public: public:
TransformPromiseNode(Own<PromiseNode>&& dependency, Func&& func, ErrorFunc&& errorHandler) EventLoop();
: TransformPromiseNodeBase(kj::mv(dependency)), ~EventLoop() noexcept(false);
func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
~TransformPromiseNode() noexcept(false) {
// We need to make sure the dependency is deleted before we delete the continuations because it
// is a common pattern for the continuations to hold ownership of objects that might be in-use
// by the dependency.
dropDependency();
}
private:
Func func;
ErrorFunc errorHandler;
void getImpl(ExceptionOrValue& output) override {
ExceptionOr<DepT> depResult;
getDepResult(depResult);
KJ_IF_MAYBE(depException, depResult.exception) {
output.as<T>() = handle(
MaybeVoidCaller<Exception, FixVoid<ReturnType<ErrorFunc, Exception>>>::apply(
errorHandler, kj::mv(*depException)));
} else KJ_IF_MAYBE(depValue, depResult.value) {
output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
}
}
ExceptionOr<T> handle(T&& value) {
return kj::mv(value);
}
ExceptionOr<T> handle(PropagateException::Bottom&& value) {
return ExceptionOr<T>(false, value.asException());
}
};
// -------------------------------------------------------------------
class ForkHubBase;
class ForkBranchBase: public PromiseNode { static EventLoop& current();
public: // Get the event loop for the current thread. Throws an exception if no event loop is active.
ForkBranchBase(Own<ForkHubBase>&& hub);
~ForkBranchBase() noexcept(false);
void hubReady() noexcept; bool isCurrent() const;
// Called by the hub to indicate that it is ready. // Is this EventLoop the current one for this thread? This can safely be called from any thread.
// implements PromiseNode ------------------------------------------ void runForever() KJ_NORETURN;
bool onReady(Event& event) noexcept override; // Runs the loop forever. Useful for servers.
PromiseNode* getInnerForTrace() override;
protected: protected:
inline ExceptionOrValue& getHubResultRef(); // -----------------------------------------------------------------
// Subclasses should implement these.
void releaseHub(ExceptionOrValue& output);
// Release the hub. If an exception is thrown, add it to `output`.
private:
OnReadyEvent onReadyEvent;
Own<ForkHubBase> hub;
ForkBranchBase* next = nullptr;
ForkBranchBase** prevPtr = nullptr;
friend class ForkHubBase;
};
template <typename T> T copyOrAddRef(T& t) { return t; }
template <typename T> Own<T> copyOrAddRef(Own<T>& t) { return t->addRef(); }
template <typename T>
class ForkBranch final: public ForkBranchBase {
// A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
// a const reference.
public:
ForkBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
void get(ExceptionOrValue& output) noexcept override {
ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
KJ_IF_MAYBE(value, hubResult.value) {
output.as<T>().value = copyOrAddRef(*value);
} else {
output.as<T>().value = nullptr;
}
output.exception = hubResult.exception;
releaseHub(output);
}
};
// -------------------------------------------------------------------
class ForkHubBase: public Refcounted, protected Event {
public:
ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef);
inline ExceptionOrValue& getResultRef() { return resultRef; }
private:
Own<PromiseNode> inner;
ExceptionOrValue& resultRef;
ForkBranchBase* headBranch = nullptr;
ForkBranchBase** tailBranch = &headBranch;
// Tail becomes null once the inner promise is ready and all branches have been notified.
Maybe<Own<Event>> fire() override;
_::PromiseNode* getInnerForTrace() override;
friend class ForkBranchBase;
};
template <typename T>
class ForkHub final: public ForkHubBase {
// A PromiseNode that implements the hub of a fork. The first call to Promise::fork() replaces
// the promise's outer node with a ForkHub, and subsequent calls add branches to that hub (if
// possible).
public:
ForkHub(Own<PromiseNode>&& inner): ForkHubBase(kj::mv(inner), result) {}
Promise<_::UnfixVoid<T>> addBranch() {
return Promise<_::UnfixVoid<T>>(false, kj::heap<ForkBranch<T>>(addRef(*this)));
}
private:
ExceptionOr<T> result;
};
inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
return hub->getResultRef();
}
// ------------------------------------------------------------------- virtual void prepareToSleep() noexcept = 0;
// Called just before `sleep()`. After calling this, the caller checks if any events are
// scheduled. If so, it calls `wake()`. Then, whether or not events were scheduled, it calls
// `sleep()`. Thus, `prepareToSleep()` is always followed by exactly one call to `sleep()`.
class ChainPromiseNode final: public PromiseNode, private Event { virtual void sleep() = 0;
public: // Do not return until `wake()` is called. Always preceded by a call to `prepareToSleep()`.
explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override; virtual void wake() const = 0;
void get(ExceptionOrValue& output) noexcept override; // Cancel any calls to sleep() that occurred *after* the last call to `prepareToSleep()`.
PromiseNode* getInnerForTrace() override; // May be called from a different thread. The interaction with `prepareToSleep()` is important:
// a `wake()` may occur between a call to `prepareToSleep()` and `sleep()`, in which case
// the subsequent `sleep()` must return immediately. `wake()` may be called any time an event
// is armed; it should return quickly if the loop isn't prepared to sleep.
private: private:
enum State { bool running = false;
STEP1, // True while looping -- wait() is then not allowed.
STEP2
};
State state;
Own<PromiseNode> inner;
// In PRE_STEP1 / STEP1, a PromiseNode for a Promise<T>.
// In STEP2, a PromiseNode for a T.
Event* onReadyEvent = nullptr;
Maybe<Own<Event>> fire() override;
};
template <typename T>
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, Promise<T>*) {
return heap<ChainPromiseNode>(kj::mv(node));
}
template <typename T> _::Event* head = nullptr;
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) { _::Event** tail = &head;
return kj::mv(node); _::Event** depthFirstInsertPoint = &head;
}
// ------------------------------------------------------------------- Own<_::TaskSetImpl> daemons;
class ExclusiveJoinPromiseNode final: public PromiseNode { template <typename T, typename Func, typename ErrorFunc>
public: Own<_::PromiseNode> thenImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler);
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false);
bool onReady(Event& event) noexcept override; void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result);
void get(ExceptionOrValue& output) noexcept override; // Run the event loop until `node` is fulfilled, and then `get()` its result into `result`.
PromiseNode* getInnerForTrace() override;
private: Promise<void> yield();
class Branch: public Event { // Returns a promise that won't resolve until all events currently on the queue are fired.
public: // Otherwise, returns an already-resolved promise. Used to implement evalLater().
Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency);
~Branch() noexcept(false);
bool get(ExceptionOrValue& output); template <typename T>
// Returns true if this is the side that finished. T wait(Promise<T>&& promise);
Maybe<Own<Event>> fire() override; template <typename Func>
_::PromiseNode* getInnerForTrace() override; PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
private: void daemonize(kj::Promise<void>&& promise);
ExclusiveJoinPromiseNode& joinNode;
Own<PromiseNode> dependency;
};
Branch left; template <typename>
Branch right; friend class Promise;
OnReadyEvent onReadyEvent; friend Promise<void> yield();
template <typename ErrorFunc>
friend void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler);
template <typename Func>
friend PromiseForResult<Func, void> evalLater(Func&& func);
friend class _::Event;
}; };
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected Event { class SimpleEventLoop final: public EventLoop {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly // A simple EventLoop implementation that does not know how to wait for any external I/O.
// evaluate it.
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
bool onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
Own<PromiseNode> dependency;
OnReadyEvent onReadyEvent;
ExceptionOrValue& resultRef;
Maybe<Own<Event>> fire() override;
};
template <typename T>
class EagerPromiseNode final: public EagerPromiseNodeBase {
public:
EagerPromiseNode(Own<PromiseNode>&& dependency)
: EagerPromiseNodeBase(kj::mv(dependency), result) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
template <typename T>
Own<PromiseNode> spark(Own<PromiseNode>&& node) {
// Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
// on it.
return heap<EagerPromiseNode<T>>(kj::mv(node));
}
// -------------------------------------------------------------------
class AdapterPromiseNodeBase: public PromiseNode {
public: public:
bool onReady(Event& event) noexcept override; SimpleEventLoop();
~SimpleEventLoop() noexcept(false);
protected: protected:
inline void setReady() { void prepareToSleep() noexcept override;
onReadyEvent.arm(); void sleep() override;
} void wake() const override;
private:
OnReadyEvent onReadyEvent;
};
template <typename T, typename Adapter>
class AdapterPromiseNode final: public AdapterPromiseNodeBase,
private PromiseFulfiller<UnfixVoid<T>> {
// A PromiseNode that wraps a PromiseAdapter.
public:
template <typename... Params>
AdapterPromiseNode(Params&&... params)
: adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
void get(ExceptionOrValue& output) noexcept override {
KJ_IREQUIRE(!isWaiting());
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
bool waiting = true;
Adapter adapter;
void fulfill(T&& value) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(kj::mv(value));
setReady();
}
}
void reject(Exception&& exception) override {
if (waiting) {
waiting = false;
result = ExceptionOr<T>(false, kj::mv(exception));
setReady();
}
}
bool isWaiting() override {
return waiting;
}
};
} // namespace _ (private)
// =======================================================================================
template <typename T>
T EventLoop::wait(Promise<T>&& promise) {
_::ExceptionOr<_::FixVoid<T>> result;
waitImpl(kj::mv(promise.node), result);
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
template <typename Func>
PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) {
// Invoke thenImpl() on yield(). Always spark the result.
return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
thenImpl(yield(), kj::fwd<Func>(func), _::PropagateException())));
}
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> EventLoop::thenImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler) {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
kj::mv(promise.node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return _::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr));
}
template <typename T>
Promise<T>::Promise(_::FixVoid<T> value)
: PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
template <typename T>
Promise<T>::Promise(kj::Exception&& exception)
: PromiseBase(heap<_::ImmediateBrokenPromiseNode>(kj::mv(exception))) {}
template <typename T>
template <typename Func, typename ErrorFunc>
PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) {
return PromiseForResult<Func, T>(false, EventLoop::current().thenImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)));
}
template <typename T>
T Promise<T>::wait() {
return EventLoop::current().wait(kj::mv(*this));
}
template <typename T>
ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
}
template <typename T>
Promise<T> ForkedPromise<T>::addBranch() {
return hub->addBranch();
}
template <typename T>
void Promise<T>::exclusiveJoin(Promise<T>&& other) {
node = heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node));
}
template <typename T>
template <typename... Attachments>
void Promise<T>::attach(Attachments&&... attachments) {
node = kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...));
}
template <typename T>
void Promise<T>::eagerlyEvaluate() {
node = _::spark<_::FixVoid<T>>(kj::mv(node));
}
template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) {
return EventLoop::current().evalLater(kj::fwd<Func>(func));
}
template <typename ErrorFunc>
void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler) {
return EventLoop::current().daemonize(promise.then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
}
// =======================================================================================
namespace _ { // private
template <typename T>
class WeakFulfiller final: public PromiseFulfiller<T>, private kj::Disposer {
// A wrapper around PromiseFulfiller which can be detached.
//
// There are a couple non-trivialities here:
// - If the WeakFulfiller is discarded, we want the promise it fulfills to be implicitly
// rejected.
// - We cannot destroy the WeakFulfiller until the application has discarded it *and* it has been
// detached from the underlying fulfiller, because otherwise the later detach() call will go
// to a dangling pointer. Essentially, WeakFulfiller is reference counted, although the
// refcount never goes over 2 and we manually implement the refcounting because we already need
// a mutex anyway. To this end, WeakFulfiller is its own Disposer -- dispose() is called when
// the application discards its owned pointer to the fulfiller and detach() is called when the
// promise is destroyed.
public:
KJ_DISALLOW_COPY(WeakFulfiller);
static kj::Own<WeakFulfiller> make() {
WeakFulfiller* ptr = new WeakFulfiller;
return Own<WeakFulfiller>(ptr, *ptr);
}
void fulfill(FixVoid<T>&& value) override {
if (inner != nullptr) {
inner->fulfill(kj::mv(value));
}
}
void reject(Exception&& exception) override {
if (inner != nullptr) {
inner->reject(kj::mv(exception));
}
}
bool isWaiting() override {
return inner != nullptr && inner->isWaiting();
}
void attach(PromiseFulfiller<T>& newInner) {
inner = &newInner;
}
void detach(PromiseFulfiller<T>& from) {
if (inner == nullptr) {
// Already disposed.
delete this;
} else {
KJ_IREQUIRE(inner == &from);
inner = nullptr;
}
}
private:
mutable PromiseFulfiller<T>* inner;
WeakFulfiller(): inner(nullptr) {}
void disposeImpl(void* pointer) const override {
// TODO(perf): Factor some of this out so it isn't regenerated for every fulfiller type?
if (inner == nullptr) {
// Already detached.
delete this;
} else {
if (inner->isWaiting()) {
inner->reject(kj::Exception(
kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__,
kj::heapString("PromiseFulfiller was destroyed without fulfilling the promise.")));
}
inner = nullptr;
}
}
};
template <typename T>
class PromiseAndFulfillerAdapter {
public:
PromiseAndFulfillerAdapter(PromiseFulfiller<T>& fulfiller,
WeakFulfiller<T>& wrapper)
: fulfiller(fulfiller), wrapper(wrapper) {
wrapper.attach(fulfiller);
}
~PromiseAndFulfillerAdapter() noexcept(false) {
wrapper.detach(fulfiller);
}
private: private:
PromiseFulfiller<T>& fulfiller; mutable int preparedToSleep = 0;
WeakFulfiller<T>& wrapper; #if !KJ_USE_FUTEX
mutable pthread_mutex_t mutex;
mutable pthread_cond_t condvar;
#endif
}; };
} // namespace _ (private)
template <typename T, typename Adapter, typename... Params>
Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
return Promise<T>(false, heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
kj::fwd<Params>(adapterConstructorParams)...));
}
template <typename T>
PromiseFulfillerPair<T> newPromiseAndFulfiller() {
auto wrapper = _::WeakFulfiller<T>::make();
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
Promise<_::JoinPromises<T>> promise(false,
_::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr)));
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
}
} // namespace kj } // namespace kj
#include "async-inl.h"
#endif // KJ_ASYNC_H_ #endif // KJ_ASYNC_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment