Commit c0750662 authored by Kenton Varda's avatar Kenton Varda

Reduce generated code for Promises by making most of the node implementations mostly non-templated.

parent 6ff9769f
......@@ -40,7 +40,9 @@ namespace {
thread_local EventLoop* threadLocalEventLoop = nullptr;
class YieldPromiseNode final: public _::PromiseNode<_::Void>, public EventLoop::Event {
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::EventLoop::Event*>(1)
class YieldPromiseNode final: public _::PromiseNode, public EventLoop::Event {
// A PromiseNode used to implement EventLoop::yield().
public:
......@@ -57,8 +59,8 @@ public:
return false;
}
}
_::ExceptionOr<_::Void> get() noexcept override {
return _::Void();
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return getEventLoop();
......@@ -74,6 +76,18 @@ private:
EventLoop::Event* onReadyEvent = nullptr;
};
class BoolEvent: public EventLoop::Event {
public:
BoolEvent(const EventLoop& loop): Event(loop) {}
~BoolEvent() { disarm(); }
bool fired = false;
void fire() override {
fired = true;
}
};
} // namespace
EventLoop& EventLoop::current() {
......@@ -91,12 +105,15 @@ EventLoop::EventLoop(): queue(*this) {
queue.prev = &queue;
}
void EventLoop::loopWhile(bool& keepGoing) {
void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result) {
EventLoop* oldEventLoop = threadLocalEventLoop;
threadLocalEventLoop = this;
KJ_DEFER(threadLocalEventLoop = oldEventLoop);
while (keepGoing) {
BoolEvent event(*this);
event.fired = node->onReady(event);
while (!event.fired) {
queue.mutex.lock(_::Mutex::EXCLUSIVE);
// Get the first event in the queue.
......@@ -125,6 +142,9 @@ void EventLoop::loopWhile(bool& keepGoing) {
KJ_DEFER(event->mutex.unlock(_::Mutex::EXCLUSIVE));
event->fire();
}
KJ_DBG(&result);
node->get(result);
}
Promise<void> EventLoop::yield() {
......@@ -211,4 +231,195 @@ void SimpleEventLoop::wake() const {
}
}
// =======================================================================================
void PromiseBase::absolve() {
runCatchingExceptions([this]() { auto deleteMe = kj::mv(node); });
}
namespace _ { // private
bool PromiseNode::atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Event& newEvent) {
// If onReadyEvent is null, atomically set it to point at newEvent and return false.
// If onReadyEvent is _kJ_ALREADY_READY, return true.
// Useful for implementing onReady() thread-safely.
EventLoop::Event* oldEvent = nullptr;
if (__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, &newEvent, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Event was swapped in and will be called later.
return false;
} else {
// `onReadyEvent` is not null. If it is _kJ_ALREADY_READY then this promise was fulfilled
// before any dependent existed, otherwise there is already a different dependent.
KJ_IREQUIRE(oldEvent == _kJ_ALREADY_READY, "onReady() can only be called once.");
return true;
}
}
void PromiseNode::atomicReady(EventLoop::Event*& onReadyEvent) {
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
EventLoop::Event* oldEvent = nullptr;
if (!__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, _kJ_ALREADY_READY, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
oldEvent->arm();
}
}
bool ImmediatePromiseNodeBase::onReady(EventLoop::Event& event) noexcept { return true; }
Maybe<const EventLoop&> ImmediatePromiseNodeBase::getSafeEventLoop() noexcept { return nullptr; }
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
: exception(kj::mv(exception)) {}
void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
output.exception = kj::mv(exception);
}
TransformPromiseNodeBase::TransformPromiseNodeBase(
const EventLoop& loop, Own<PromiseNode>&& dependency)
: loop(loop), dependency(kj::mv(dependency)) {}
bool TransformPromiseNodeBase::onReady(EventLoop::Event& event) noexcept {
return dependency->onReady(event);
}
void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
getImpl(output);
})) {
output.addException(kj::mv(*exception));
}
}
Maybe<const EventLoop&> TransformPromiseNodeBase::getSafeEventLoop() noexcept {
return loop;
}
ChainPromiseNode::ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner)
: Event(loop), state(PRE_STEP1), inner(kj::mv(inner)) {
KJ_IREQUIRE(this->inner->isSafeEventLoop(loop));
arm();
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {
disarm();
}
bool ChainPromiseNode::onReady(EventLoop::Event& event) noexcept {
switch (state) {
case PRE_STEP1:
case STEP1:
KJ_IREQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
onReadyEvent = &event;
return false;
case STEP2:
return inner->onReady(event);
}
KJ_UNREACHABLE;
}
void ChainPromiseNode::get(ExceptionOrValue& output) noexcept {
KJ_IREQUIRE(state == STEP2);
return inner->get(output);
}
Maybe<const EventLoop&> ChainPromiseNode::getSafeEventLoop() noexcept {
return getEventLoop();
}
void ChainPromiseNode::fire() {
if (state == PRE_STEP1 && !inner->onReady(*this)) {
state = STEP1;
return;
}
KJ_IREQUIRE(state != STEP2);
static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
"This code assumes Promise<T> does not add any new members to PromiseBase.");
ExceptionOr<PromiseBase> intermediate;
inner->get(intermediate);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
inner = nullptr;
})) {
intermediate.addException(kj::mv(*exception));
}
KJ_IF_MAYBE(exception, intermediate.exception) {
// There is an exception. If there is also a value, delete it.
kj::runCatchingExceptions([&,this]() { intermediate.value = nullptr; });
// Now set step2 to a rejected promise.
inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception));
} else KJ_IF_MAYBE(value, intermediate.value) {
// There is a value and no exception. The value is itself a promise. Adopt it as our
// step2.
inner = kj::mv(value->node);
} else {
// We can only get here if inner->get() returned neither an exception nor a
// value, which never actually happens.
KJ_IASSERT(false, "Inner node returned empty value.");
}
state = STEP2;
if (onReadyEvent != nullptr) {
if (inner->onReady(*onReadyEvent)) {
onReadyEvent->arm();
}
}
}
CrossThreadPromiseNodeBase::CrossThreadPromiseNodeBase(
const EventLoop& loop, Own<PromiseNode>&& dependent, ExceptionOrValue& resultRef)
: Event(loop), dependent(kj::mv(dependent)), resultRef(resultRef) {
KJ_IREQUIRE(this->dependent->isSafeEventLoop(loop));
// The constructor may be called from any thread, so before we can even call onReady() we need
// to switch threads.
arm();
}
CrossThreadPromiseNodeBase::~CrossThreadPromiseNodeBase() noexcept(false) {
disarm();
}
bool CrossThreadPromiseNodeBase::onReady(EventLoop::Event& event) noexcept {
return PromiseNode::atomicOnReady(onReadyEvent, event);
}
Maybe<const EventLoop&> CrossThreadPromiseNodeBase::getSafeEventLoop() noexcept {
return nullptr;
}
void CrossThreadPromiseNodeBase::fire() {
if (!isWaiting && !this->dependent->onReady(*this)) {
isWaiting = true;
} else {
dependent->get(resultRef);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
auto deleteMe = kj::mv(dependent);
})) {
resultRef.addException(kj::mv(*exception));
}
// If onReadyEvent is null, set it to _kJ_ALREADY_READY. Otherwise, arm it.
PromiseNode::atomicReady(onReadyEvent);
}
}
bool AdapterPromiseNodeBase::onReady(EventLoop::Event& event) noexcept {
return PromiseNode::atomicOnReady(onReadyEvent, event);
}
Maybe<const EventLoop&> AdapterPromiseNodeBase::getSafeEventLoop() noexcept {
// We're careful to be thread-safe so any thread is OK.
return nullptr;
}
} // namespace _ (private)
} // namespace kj
......@@ -30,8 +30,29 @@
namespace kj {
class EventLoop;
class SimpleEventLoop;
template <typename T>
class Promise;
template <typename T>
class PromiseFulfiller;
// =======================================================================================
// ***************************************************************************************
// This section contains various internal stuff that needs to be declared upfront.
// Scroll down to `class EventLoop` or `class Promise` for the public interfaces.
// ***************************************************************************************
// =======================================================================================
// Various internal stuff that needs to be declared upfront. Users should ignore this.
namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; };
template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
......@@ -57,18 +78,6 @@ public:
}
};
template <typename T>
class Promise;
template <typename T> struct PromiseType_ { typedef T Type; };
template <typename T> struct PromiseType_<Promise<T>> { typedef T Type; };
template <typename T>
using PromiseType = typename PromiseType_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
namespace _ { // private
template <typename Func, typename T>
struct ReturnType_ { typedef decltype(instance<Func>()(instance<T>())) Type; };
template <typename Func>
......@@ -80,6 +89,7 @@ using ReturnType = typename ReturnType_<Func, T>::Type;
// T is void, this is the return type of Func called with no arguments.
struct Void {};
// Application code should NOT refer to this! See `kj::READY_NOW` instead.
template <typename T> struct FixVoid_ { typedef T Type; };
template <> struct FixVoid_<void> { typedef Void Type; };
......@@ -131,20 +141,47 @@ inline T&& returnMaybeVoid(T&& t) {
}
inline void returnMaybeVoid(Void&& v) {}
template <typename T>
class ExceptionOrValue;
class PromiseNode;
template <typename T>
class ChainPromiseNode;
} // namespace _ (private)
// =======================================================================================
// ***************************************************************************************
// User-relevant interfaces start here.
// ***************************************************************************************
// =======================================================================================
template <typename Func, typename T>
using PromiseForResult = Promise<_::JoinPromises<_::ReturnType<Func, T>>>;
// Evaluates to the type of Promise for the result of calling functor type Func with parameter type
// T. If T is void, then the promise is for the result of calling Func with no arguments. If
// Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
class EventLoop {
// Represents a queue of events being executed in a loop. Most code won't interact with
// EventLoop directly, but instead use `Promise`s to interact with it indirectly.
// EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
// documentation for `Promise`.
//
// You will need to construct an `EventLoop` at the top level of your program. You can then
// use it to construct some promises and wait on the result. Example:
//
// int main() {
// SimpleEventLoop loop;
//
// // Most code that does I/O needs to be run from within an
// // EventLoop, so it can use Promise::then(). So, we need to
// // use `evalLater()` to run `getHttp()` inside the event
// // loop.
// Promise<String> textPromise = loop.evalLater(
// []() { return getHttp("http://example.com"); });
//
// // Now we can wait for the promise to complete.
// String text = loop.wait(kj::mv(textPromise));
// print(text);
// return 0;
// }
public:
EventLoop();
......@@ -195,7 +232,7 @@ public:
// compared to other tasks occurring concurrently.
template <typename Func>
auto evalLater(Func&& func) const -> Promise<PromiseType<_::ReturnType<Func, void>>>;
auto evalLater(Func&& func) const -> PromiseForResult<Func, void>;
// Schedule for the given zero-parameter function to be executed in the event loop at some
// point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
// a promise, `evalLater()` returns a Promise for the result of resolving that promise.
......@@ -210,10 +247,10 @@ public:
// `evalLater()` is largely equivalent to `there()` called on an already-fulfilled
// `Promise<Void>`.
template <typename T, typename Func, typename ErrorFunc = PropagateException>
template <typename T, typename Func, typename ErrorFunc = _::PropagateException>
auto there(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler = PropagateException()) const
-> Promise<PromiseType<_::ReturnType<Func, T>>>;
ErrorFunc&& errorHandler = _::PropagateException()) const
-> PromiseForResult<Func, T>;
// When the given promise is fulfilled, execute `func` on its result inside this `EventLoop`.
// Returns a promise for the result of `func()` -- or, if `func()` itself returns a promise,
// `there()` returns a Promise for the result of resolving that promise.
......@@ -296,12 +333,11 @@ private:
EventListHead queue;
template <typename T, typename Func, typename ErrorFunc>
auto thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const
-> Own<_::PromiseNode<_::FixVoid<PromiseType<_::ReturnType<Func, T>>>>>;
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const;
// Shared implementation of there() and Promise::then().
void loopWhile(bool& keepGoing);
// Run the event loop until keepGoing becomes false.
void waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result);
// Run the event loop until `node` is fulfilled, and then `get()` its result into `result`.
template <typename>
friend class Promise;
......@@ -327,14 +363,44 @@ private:
// -------------------------------------------------------------------
class PromiseBase {
public:
PromiseBase(PromiseBase&&) = default;
PromiseBase& operator=(PromiseBase&&) = default;
inline ~PromiseBase() { absolve(); }
void absolve();
// Explicitly cancel the async operation and free all related local data structures. This is
// called automatically by Promise's destructor, but is sometimes useful to call explicitly.
//
// Any exceptions thrown by destructors of objects that were handling the async operation will
// be caught and discarded, on the assumption that such exceptions are a side-effect of deleting
// these structures while they were in the middle of doing something. Presumably, you do not
// care. In contrast, if you were to call `then()` or `wait()`, such exceptions would be caught
// and propagated.
private:
Own<_::PromiseNode> node;
PromiseBase() = default;
PromiseBase(Own<_::PromiseNode>&& node): node(kj::mv(node)) {}
friend class EventLoop;
friend class _::ChainPromiseNode;
template <typename>
friend class Promise;
};
template <typename T>
class Promise {
class Promise: public PromiseBase {
// The basic primitive of asynchronous computation in KJ. Similar to "futures", but more
// powerful. Similar to E promises and JavaScript Promises/A.
//
// A Promise represents a promise to produce a value of type T some time in the future. Once
// that value has been produced, the promise is "fulfilled". Alternatively, a promise can be
// "broken", with an Exception describing what went wrong.
// "broken", with an Exception describing what went wrong. You may implicitly convert a value of
// type T to an already-fulfilled Promise<T>. You may implicitly convert the constant
// `kj::READY_NOW` to an already-fulfilled Promise<void>.
//
// Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed
// or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
......@@ -343,9 +409,32 @@ class Promise {
// To use the result of a Promise, you must call `then()` and supply a callback function to
// call with the result. `then()` returns another promise, for the result of the callback.
// Any time that this would result in Promise<Promise<T>>, the promises are collapsed into a
// simple Promise<T> that first waits for the outer promise, then the inner.
// simple Promise<T> that first waits for the outer promise, then the inner. Example:
//
// You may implicitly convert a value of type T to an already-fulfilled Promise<T>.
// // Open a remote file, read the content, and then count the
// // number of lines of text.
// // Note that none of the calls here block. `file`, `content`
// // and `lineCount` are all initialized immediately before any
// // asynchronous operations occur. The lambda callbacks are
// // called later.
// Promise<Own<File>> file = openFtp("ftp://host/foo/bar");
// Promise<String> content = file.then(
// [](Own<File> file) -> Promise<String> {
// return file.readAll();
// });
// Promise<int> lineCount = content.then(
// [](String text) -> int {
// uint count = 0;
// for (char c: text) count += (c == '\n');
// return count;
// });
//
// For `then()` to work, the current thread must be looping in an `EventLoop`. Each callback
// is scheduled to execute in that loop. Since `then()` schedules callbacks only on the current
// thread's event loop, you do not need to worry about two callbacks running at the same time.
// If you explicitly _want_ a callback to run on some other thread, you can schedule it there
// using the `EventLoop` interface. You will need to set up at least one `EventLoop` at the top
// level of your program before you can use promises.
//
// To adapt a non-Promise-based asynchronous API to promises, use `newAdaptedPromise()`.
//
......@@ -367,15 +456,18 @@ class Promise {
// https://github.com/domenic/promises-unwrapping
public:
Promise(_::FixVoid<T>&& value);
Promise(_::FixVoid<T> value);
// Construct an already-fulfilled Promise from a value of type T. For non-void promises, the
// parameter type is simply T. So, e.g., in a function that returns `Promise<int>`, you can
// say `return 123;` to return a promise that is already fulfilled to 123.
//
// For void promises, use `kj::READY_NOW` as the value, e.g. `return kj::READY_NOW`.
inline Promise(decltype(nullptr)) {}
inline ~Promise() { absolve(); }
Promise(Promise&&) = default;
Promise& operator=(Promise&&) = default;
template <typename Func, typename ErrorFunc = PropagateException>
auto then(Func&& func, ErrorFunc&& errorHandler = PropagateException())
-> Promise<PromiseType<_::ReturnType<Func, T>>>;
template <typename Func, typename ErrorFunc = _::PropagateException>
auto then(Func&& func, ErrorFunc&& errorHandler = _::PropagateException())
-> PromiseForResult<Func, T>;
// Mostly equivalent to `EventLoop::current().there(kj::mv(*this), func, errorHandler)`.
//
// Note that `then()` consumes the promise on which it is called, in the sense of move semantics.
......@@ -410,28 +502,18 @@ public:
// After returning, the promise is no longer valid, and cannot be `wait()`ed on or `then()`ed
// again.
void absolve();
// Explicitly cancel the async operation and free all related local data structures. This is
// called automatically by Promise's destructor, but is sometimes useful to call explicitly.
//
// Any exceptions thrown by destructors of objects that were handling the async operation will
// be caught and discarded, on the assumption that such exceptions are a side-effect of deleting
// these structures while they were in the middle of doing something. Presumably, you do not
// care. In contrast, if you were to call `then()` or `wait()`, such exceptions would be caught
// and propagated.
private:
Own<_::PromiseNode<_::FixVoid<T>>> node;
Promise(Own<_::PromiseNode<_::FixVoid<T>>>&& node): node(kj::mv(node)) {}
Promise(Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {}
friend class EventLoop;
template <typename>
friend class _::ChainPromiseNode;
template <typename U, typename Adapter, typename... Params>
friend Promise<U> newAdaptedPromise(Params&&... adapterConstructorParams);
};
constexpr _::Void READY_NOW = _::Void();
// Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
// cast to `Promise<void>`.
// -------------------------------------------------------------------
// Advanced promise construction
......@@ -517,13 +599,13 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller();
namespace _ { // private
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::EventLoop::Event*>(1)
template <typename T>
struct ExceptionOr {
ExceptionOr() = default;
ExceptionOr(T&& value): value(kj::mv(value)) {}
ExceptionOr(bool, Exception&& exception): exception(kj::mv(exception)) {}
class ExceptionOr;
class ExceptionOrValue {
public:
ExceptionOrValue(bool, Exception&& exception): exception(kj::mv(exception)) {}
KJ_DISALLOW_COPY(ExceptionOrValue);
void addException(Exception&& exception) {
if (this->exception == nullptr) {
......@@ -531,26 +613,46 @@ struct ExceptionOr {
}
}
Maybe<T> value;
template <typename T>
ExceptionOr<T>& as() { return *static_cast<ExceptionOr<T>*>(this); }
Maybe<Exception> exception;
protected:
// Allow subclasses to have move constructor / assignment.
ExceptionOrValue() = default;
ExceptionOrValue(ExceptionOrValue&& other) = default;
ExceptionOrValue& operator=(ExceptionOrValue&& other) = default;
};
template <typename T>
class ExceptionOr: public ExceptionOrValue {
public:
ExceptionOr() = default;
ExceptionOr(T&& value): value(kj::mv(value)) {}
ExceptionOr(bool, Exception&& exception): ExceptionOrValue(false, kj::mv(exception)) {}
ExceptionOr(ExceptionOr&&) = default;
ExceptionOr& operator=(ExceptionOr&&) = default;
Maybe<T> value;
};
class PromiseNode {
// A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
//
// TODO(perf): Maybe PromiseNode should not be a template? ExceptionOr<T> could subclass some
// generic type, and then only certain key pieces of code that really need to know what T is
// would need to be templated. Several of the node types might not need any templating at
// all. This would save a lot of code generation.
// To reduce generated code bloat, PromiseNode is not a template. Instead, it makes very hacky
// use of pointers to ExceptionOrValue which actually point to ExceptionOr<T>, but are only
// so down-cast in the few places that really need to be templated. Luckily this is all
// internal implementation details.
public:
virtual bool onReady(EventLoop::Event& event) noexcept = 0;
// Returns true if already ready, otherwise arms the given event when ready.
virtual ExceptionOr<T> get() noexcept = 0;
// Get the result. Can only be called once, and only after the node is ready. Must be
// called directly from the event loop, with no application code on the stack.
virtual void get(ExceptionOrValue& output) noexcept = 0;
// Get the result. `output` points to an ExceptionOr<T> into which the result will be written.
// Can only be called once, and only after the node is ready. Must be called directly from the
// event loop, with no application code on the stack.
virtual Maybe<const EventLoop&> getSafeEventLoop() noexcept = 0;
// Returns an EventLoop from which get() and onReady() may safely be called. If the node has
......@@ -565,94 +667,92 @@ public:
}
protected:
static bool atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Event& newEvent) {
// If onReadyEvent is null, atomically set it to point at newEvent and return false.
// If onReadyEvent is _kJ_ALREADY_READY, return true.
// Useful for implementing onReady() thread-safely.
EventLoop::Event* oldEvent = nullptr;
if (__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, &newEvent, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
// Event was swapped in and will be called later.
return false;
} else {
// `onReadyEvent` is not null. If it is _kJ_ALREADY_READY then this promise was fulfilled
// before any dependent existed, otherwise there is already a different dependent.
KJ_IREQUIRE(oldEvent == _kJ_ALREADY_READY, "onReady() can only be called once.");
return true;
}
}
static void atomicReady(EventLoop::Event*& onReadyEvent) {
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
static bool atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Event& newEvent);
// If onReadyEvent is null, atomically set it to point at newEvent and return false.
// If onReadyEvent is _kJ_ALREADY_READY, return true.
// Useful for implementing onReady() thread-safely.
static void atomicReady(EventLoop::Event*& onReadyEvent);
// If onReadyEvent is null, atomically set it to _kJ_ALREADY_READY.
// Otherwise, arm whatever it points at.
// Useful for firing events in conjuction with atomicOnReady().
};
EventLoop::Event* oldEvent = nullptr;
if (!__atomic_compare_exchange_n(&onReadyEvent, &oldEvent, _kJ_ALREADY_READY, false,
__ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
oldEvent->arm();
}
}
class ImmediatePromiseNodeBase: public PromiseNode {
public:
bool onReady(EventLoop::Event& event) noexcept override;
Maybe<const EventLoop&> getSafeEventLoop() noexcept override;
};
template <typename T>
class ImmediatePromiseNode final: public PromiseNode<T> {
class ImmediatePromiseNode final: public ImmediatePromiseNodeBase {
// A promise that has already been resolved to an immediate value or exception.
public:
ImmediatePromiseNode(ExceptionOr<T>&& result): result(kj::mv(result)) {}
bool onReady(EventLoop::Event& event) noexcept override { return true; }
ExceptionOr<T> get() noexcept override { return kj::mv(result); }
Maybe<const EventLoop&> getSafeEventLoop() noexcept override { return nullptr; }
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
class TransformPromiseNode final: public PromiseNode<T> {
// A PromiseNode that transforms the result of another PromiseNode through an application-provided
// function (implements `then()`).
class ImmediateBrokenPromiseNode final: public ImmediatePromiseNodeBase {
public:
TransformPromiseNode(const EventLoop& loop, Own<PromiseNode<DepT>>&& dependency,
Func&& func, ErrorFunc&& errorHandler)
: loop(loop), dependency(kj::mv(dependency)), func(kj::fwd<Func>(func)),
errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
ImmediateBrokenPromiseNode(Exception&& exception);
bool onReady(EventLoop::Event& event) noexcept override {
return dependency->onReady(event);
}
void get(ExceptionOrValue& output) noexcept override;
ExceptionOr<T> get() noexcept override {
ExceptionOr<T> result;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
ExceptionOr<DepT> depResult = dependency->get();
KJ_IF_MAYBE(depException, depResult.exception) {
result = handle(errorHandler(kj::mv(*depException)));
} else KJ_IF_MAYBE(depValue, depResult.value) {
result = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
}
})) {
result.addException(kj::mv(*exception));
}
private:
Exception exception;
};
return kj::mv(result);
}
class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(const EventLoop& loop, Own<PromiseNode>&& dependency);
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return loop;
}
bool onReady(EventLoop::Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
Maybe<const EventLoop&> getSafeEventLoop() noexcept override;
private:
const EventLoop& loop;
Own<PromiseNode<DepT>> dependency;
Own<PromiseNode> dependency;
virtual void getImpl(ExceptionOrValue& output) = 0;
template <typename, typename, typename, typename>
friend class TransformPromiseNode;
};
template <typename T, typename DepT, typename Func, typename ErrorFunc>
class TransformPromiseNode final: public TransformPromiseNodeBase {
// A PromiseNode that transforms the result of another PromiseNode through an application-provided
// function (implements `then()`).
public:
TransformPromiseNode(const EventLoop& loop, Own<PromiseNode>&& dependency,
Func&& func, ErrorFunc&& errorHandler)
: TransformPromiseNodeBase(loop, kj::mv(dependency)),
func(kj::fwd<Func>(func)), errorHandler(kj::fwd<ErrorFunc>(errorHandler)) {}
private:
Func func;
ErrorFunc errorHandler;
void getImpl(ExceptionOrValue& output) override {
ExceptionOr<DepT> depResult;
dependency->get(depResult);
KJ_IF_MAYBE(depException, depResult.exception) {
output.as<T>() = handle(MaybeVoidCaller<Exception&&, T>::apply(
errorHandler, kj::mv(*depException)));
} else KJ_IF_MAYBE(depValue, depResult.value) {
output.as<T>() = handle(MaybeVoidCaller<DepT, T>::apply(func, kj::mv(*depValue)));
}
}
ExceptionOr<T> handle(T&& value) {
return kj::mv(value);
}
......@@ -661,52 +761,14 @@ private:
}
};
template <typename T>
class ChainPromiseNode final: public PromiseNode<T>, private EventLoop::Event {
// Adapts a PromiseNode<Promise<T>> to PromiseNode<T>, by first waiting for the outer promise,
// then waiting for the inner promise.
class ChainPromiseNode final: public PromiseNode, private EventLoop::Event {
public:
inline ChainPromiseNode(const EventLoop& loop, Own<PromiseNode<Promise<UnfixVoid<T>>>> step1)
: Event(loop), state(PRE_STEP1), step1(kj::mv(step1)) {
KJ_IREQUIRE(this->step1->isSafeEventLoop(loop));
arm();
}
ChainPromiseNode(const EventLoop& loop, Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
~ChainPromiseNode() {
disarm();
switch (state) {
case PRE_STEP1:
case STEP1:
dtor(step1);
break;
case STEP2:
dtor(step2);
break;
}
}
bool onReady(EventLoop::Event& event) noexcept override {
switch (state) {
case PRE_STEP1:
case STEP1:
KJ_IREQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
onReadyEvent = &event;
return false;
case STEP2:
return step2->onReady(event);
}
KJ_UNREACHABLE;
}
ExceptionOr<T> get() noexcept override {
KJ_IREQUIRE(state == STEP2);
return step2->get();
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return getEventLoop();
}
bool onReady(EventLoop::Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
Maybe<const EventLoop&> getSafeEventLoop() noexcept override;
private:
enum State {
......@@ -717,135 +779,68 @@ private:
State state;
union {
Own<PromiseNode<Promise<UnfixVoid<T>>>> step1;
Own<PromiseNode<T>> step2;
};
Own<PromiseNode> inner;
// In PRE_STEP1 / STEP1, a PromiseNode for a Promise<T>.
// In STEP2, a PromiseNode for a T.
EventLoop::Event* onReadyEvent = nullptr;
void fire() override {
if (state == PRE_STEP1 && !step1->onReady(*this)) {
state = STEP1;
return;
}
KJ_IREQUIRE(state != STEP2);
ExceptionOr<Promise<UnfixVoid<T>>> intermediate = step1->get();
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
dtor(step1);
})) {
intermediate.addException(kj::mv(*exception));
}
// We're in a dangerous state here where neither step1 nor step2 is initialized, but we know
// that none of the below can throw until we set state = STEP2.
KJ_IF_MAYBE(exception, intermediate.exception) {
// There is an exception. If there is also a value, delete it.
kj::runCatchingExceptions([&,this]() { intermediate.value = nullptr; });
// Now set step2 to a rejected promise.
ctor(step2, heap<ImmediatePromiseNode<T>>(ExceptionOr<T>(false, kj::mv(*exception))));
} else KJ_IF_MAYBE(value, intermediate.value) {
// There is a value and no exception. The value is itself a promise. Adopt it as our
// step2.
ctor(step2, kj::mv(value->node));
} else {
// We can only get here if step1->get() returned neither an exception nor a
// value, which never actually happens.
ctor(step2, heap<ImmediatePromiseNode<T>>(ExceptionOr<T>()));
}
state = STEP2;
if (onReadyEvent != nullptr) {
if (step2->onReady(*onReadyEvent)) {
onReadyEvent->arm();
}
}
}
void fire() override;
};
template <typename T>
Own<PromiseNode<FixVoid<T>>> maybeChain(
Own<PromiseNode<Promise<T>>>&& node, const EventLoop& loop) {
return heap<ChainPromiseNode<FixVoid<T>>>(loop, kj::mv(node));
Own<PromiseNode> maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, Promise<T>*) {
return heap<ChainPromiseNode>(loop, kj::mv(node));
}
template <typename T>
Own<PromiseNode<T>>&& maybeChain(Own<PromiseNode<T>>&& node, const EventLoop& loop) {
Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, const EventLoop& loop, T*) {
return kj::mv(node);
}
template <typename T>
class CrossThreadPromiseNode final: public PromiseNode<T>, private EventLoop::Event {
class CrossThreadPromiseNodeBase: public PromiseNode, private EventLoop::Event {
// A PromiseNode that safely imports a promised value from one EventLoop to another (which
// implies crossing threads).
public:
CrossThreadPromiseNode(const EventLoop& loop, Own<PromiseNode<T>>&& dependent)
: Event(loop), dependent(kj::mv(dependent)) {
KJ_IREQUIRE(this->dependent->isSafeEventLoop(loop));
CrossThreadPromiseNodeBase(const EventLoop& loop, Own<PromiseNode>&& dependent,
ExceptionOrValue& resultRef);
~CrossThreadPromiseNodeBase() noexcept(false);
// The constructor may be called from any thread, so before we can even call onReady() we need
// to switch threads.
arm();
}
~CrossThreadPromiseNode() {
disarm();
}
bool onReady(EventLoop::Event& event) noexcept override {
return PromiseNode<T>::atomicOnReady(onReadyEvent, event);
}
ExceptionOr<T> get() noexcept override {
KJ_IF_MAYBE(r, result) {
return kj::mv(*r);
} else {
KJ_IREQUIRE(false, "Called get() before ready.");
KJ_UNREACHABLE;
}
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
return nullptr;
}
bool onReady(EventLoop::Event& event) noexcept override;
Maybe<const EventLoop&> getSafeEventLoop() noexcept override;
private:
Own<PromiseNode<T>> dependent;
Own<PromiseNode> dependent;
EventLoop::Event* onReadyEvent = nullptr;
Maybe<ExceptionOr<T>> result;
ExceptionOrValue& resultRef;
bool isWaiting = false;
void fire() override {
if (!isWaiting && !this->dependent->onReady(*this)) {
isWaiting = true;
} else {
ExceptionOr<T> result = dependent->get();
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
auto deleteMe = kj::mv(dependent);
})) {
result.addException(kj::mv(*exception));
}
this->result = kj::mv(result);
// If onReadyEvent is null, set it to _kJ_ALREADY_READY. Otherwise, arm it.
PromiseNode<T>::atomicReady(onReadyEvent);
}
void fire() override;
};
template <typename T>
class CrossThreadPromiseNode final: public CrossThreadPromiseNodeBase {
public:
CrossThreadPromiseNode(const EventLoop& loop, Own<PromiseNode>&& dependent)
: CrossThreadPromiseNodeBase(loop, kj::mv(dependent), result) {}
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
ExceptionOr<T> result;
};
template <typename T>
Own<PromiseNode<T>> makeSafeForLoop(Own<PromiseNode<T>>&& node, const EventLoop* loop) {
Own<PromiseNode> makeSafeForLoop(Own<PromiseNode>&& node, const EventLoop& loop) {
// If the node cannot safely be used in the given loop (thread), wrap it in one that can.
KJ_IF_MAYBE(preferred, node->getSafeEventLoop()) {
if (loop != preferred) {
if (&loop != preferred) {
return heap<CrossThreadPromiseNode<T>>(*preferred, kj::mv(node));
}
}
......@@ -853,14 +848,29 @@ Own<PromiseNode<T>> makeSafeForLoop(Own<PromiseNode<T>>&& node, const EventLoop*
}
template <typename T>
Own<PromiseNode<T>> spark(Own<PromiseNode<T>>&& node, const EventLoop& loop) {
Own<PromiseNode> spark(Own<PromiseNode>&& node, const EventLoop& loop) {
// Forces evaluation of the given node to begin as soon as possible, even if no one is waiting
// on it.
return heap<CrossThreadPromiseNode<T>>(loop, kj::mv(node));
}
class AdapterPromiseNodeBase: public PromiseNode {
public:
bool onReady(EventLoop::Event& event) noexcept override;
Maybe<const EventLoop&> getSafeEventLoop() noexcept override;
protected:
inline void setReady() {
PromiseNode::atomicReady(onReadyEvent);
}
private:
EventLoop::Event* onReadyEvent = nullptr;
};
template <typename T, typename Adapter>
class AdapterPromiseNode final: public PromiseNode<T>, private PromiseFulfiller<UnfixVoid<T>> {
class AdapterPromiseNode final: public AdapterPromiseNodeBase,
private PromiseFulfiller<UnfixVoid<T>> {
// A PromiseNode that wraps a PromiseAdapter.
public:
......@@ -868,35 +878,25 @@ public:
AdapterPromiseNode(Params&&... params)
: adapter(static_cast<PromiseFulfiller<UnfixVoid<T>>&>(*this), kj::fwd<Params>(params)...) {}
bool onReady(EventLoop::Event& event) noexcept override {
return PromiseNode<T>::atomicOnReady(onReadyEvent, event);
}
ExceptionOr<T> get() noexcept override {
return kj::mv(result);
}
Maybe<const EventLoop&> getSafeEventLoop() noexcept override {
// We're careful to be thread-safe so any thread is OK.
return nullptr;
void get(ExceptionOrValue& output) noexcept override {
output.as<T>() = kj::mv(result);
}
private:
Adapter adapter;
EventLoop::Event* onReadyEvent = nullptr;
ExceptionOr<T> result;
void fulfill(T&& value) override {
if (isWaiting()) {
result = ExceptionOr<T>(kj::mv(value));
PromiseNode<T>::atomicReady(onReadyEvent);
setReady();
}
}
void reject(Exception&& exception) override {
if (isWaiting()) {
result = ExceptionOr<T>(false, kj::mv(exception));
PromiseNode<T>::atomicReady(onReadyEvent);
setReady();
}
}
......@@ -905,44 +905,21 @@ private:
}
};
// =======================================================================================
class WaitEvent: public EventLoop::Event {
public:
WaitEvent(const EventLoop& loop): Event(loop) {}
~WaitEvent() { disarm(); }
bool keepGoing = true;
// TODO(now): Move to .c++ file
void fire() override {
keepGoing = false;
}
};
} // namespace _ (private)
template <typename T>
T EventLoop::wait(Promise<T>&& promise) {
// Make sure we can safely call node->get() outside of the event loop.
Own<_::PromiseNode<_::FixVoid<T>>> node = _::makeSafeForLoop(kj::mv(promise.node), nullptr);
_::ExceptionOr<_::FixVoid<T>> result;
_::WaitEvent event(*this);
if (!node->onReady(event)) {
loopWhile(event.keepGoing);
}
_::ExceptionOr<_::FixVoid<T>> result = node->get();
waitImpl(_::makeSafeForLoop<_::FixVoid<T>>(kj::mv(promise.node), *this), result);
KJ_IF_MAYBE(exception, result.exception) {
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
return _::returnMaybeVoid(kj::mv(*value));
} else {
throwFatalException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(value, result.value) {
return _::returnMaybeVoid(kj::mv(*value));
} else KJ_IF_MAYBE(exception, result.exception) {
throwFatalException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
......@@ -950,37 +927,36 @@ T EventLoop::wait(Promise<T>&& promise) {
}
template <typename Func>
auto EventLoop::evalLater(Func&& func) const -> Promise<PromiseType<_::ReturnType<Func, void>>> {
return there(Promise<void>(_::Void()), kj::fwd<Func>(func));
auto EventLoop::evalLater(Func&& func) const -> PromiseForResult<Func, void> {
return there(Promise<void>(READY_NOW), kj::fwd<Func>(func));
}
template <typename T, typename Func, typename ErrorFunc>
auto EventLoop::there(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const
-> Promise<PromiseType<_::ReturnType<Func, T>>> {
return _::spark(thereImpl(
-> PromiseForResult<Func, T> {
return _::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, T>>>>(thereImpl(
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)), *this);
}
template <typename T, typename Func, typename ErrorFunc>
auto EventLoop::thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const
-> Own<_::PromiseNode<_::FixVoid<PromiseType<_::ReturnType<Func, T>>>>> {
Own<_::PromiseNode> EventLoop::thereImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler) const {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode<ResultT>> intermediate =
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
*this, _::makeSafeForLoop(kj::mv(promise.node), this),
*this, _::makeSafeForLoop<_::FixVoid<T>>(kj::mv(promise.node), *this),
kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return _::maybeChain(kj::mv(intermediate), *this);
return _::maybeChain(kj::mv(intermediate), *this, implicitCast<ResultT*>(nullptr));
}
template <typename T>
Promise<T>::Promise(_::FixVoid<T>&& value)
: node(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
Promise<T>::Promise(_::FixVoid<T> value)
: PromiseBase(heap<_::ImmediatePromiseNode<_::FixVoid<T>>>(kj::mv(value))) {}
template <typename T>
template <typename Func, typename ErrorFunc>
auto Promise<T>::then(Func&& func, ErrorFunc&& errorHandler)
-> Promise<PromiseType<_::ReturnType<Func, T>>> {
auto Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) -> PromiseForResult<Func, T> {
return EventLoop::current().thereImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
}
......@@ -990,11 +966,6 @@ T Promise<T>::wait() {
return EventLoop::current().wait(kj::mv(*this));
}
template <typename T>
void Promise<T>::absolve() {
runCatchingExceptions([this]() { auto deleteMe = kj::mv(node); });
}
// =======================================================================================
namespace _ { // private
......
......@@ -149,12 +149,12 @@ namespace kj {
#define _kJ_NONNULL(nature, value, ...) \
(*({ \
auto result = ::kj::_::readMaybe(value); \
if (KJ_UNLIKELY(!result)) { \
auto _kj_result = ::kj::_::readMaybe(value); \
if (KJ_UNLIKELY(!_kj_result)) { \
::kj::_::Debug::Fault(__FILE__, __LINE__, ::kj::Exception::Nature::nature, 0, \
#value " != nullptr", #__VA_ARGS__, ##__VA_ARGS__).fatal(); \
} \
result; \
_kj_result; \
}))
#define KJ_ASSERT_NONNULL(value, ...) _kJ_NONNULL(LOCAL_BUG, value, ##__VA_ARGS__)
#define KJ_REQUIRE_NONNULL(value, ...) _kJ_NONNULL(PRECONDITION, value, ##__VA_ARGS__)
......
......@@ -126,6 +126,11 @@ public:
return *this;
}
inline Own& operator=(decltype(nullptr)) {
dispose();
return *this;
}
inline T* operator->() { return ptr; }
inline const T* operator->() const { return ptr; }
inline T& operator*() { return *ptr; }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment