Commit 03f39afe authored by Kenton Varda's avatar Kenton Varda

Add Promise::split(), which transforms Promise<Tuple> into Tuple<Promise>.

parent 8c01fe73
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#ifndef KJ_ASYNC_H_ #ifndef KJ_ASYNC_H_
#error "Do not include this directly; include kj/async.h." #error "Do not include this directly; include kj/async.h."
#include "async.h" // help IDE parse this file
#endif #endif
#ifndef KJ_ASYNC_INL_H_ #ifndef KJ_ASYNC_INL_H_
...@@ -444,6 +445,28 @@ public: ...@@ -444,6 +445,28 @@ public:
} }
}; };
template <typename T, size_t index>
class SplitBranch final: public ForkBranchBase {
// A PromiseNode that implements one branch of a fork -- i.e. one of the branches that receives
// a const reference.
public:
SplitBranch(Own<ForkHubBase>&& hub): ForkBranchBase(kj::mv(hub)) {}
typedef kj::Decay<decltype(kj::get<index>(kj::instance<T>()))> Element;
void get(ExceptionOrValue& output) noexcept override {
ExceptionOr<T>& hubResult = getHubResultRef().template as<T>();
KJ_IF_MAYBE(value, hubResult.value) {
output.as<Element>().value = kj::mv(kj::get<index>(*value));
} else {
output.as<Element>().value = nullptr;
}
output.exception = hubResult.exception;
releaseHub(output);
}
};
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class ForkHubBase: public Refcounted, protected Event { class ForkHubBase: public Refcounted, protected Event {
...@@ -479,8 +502,24 @@ public: ...@@ -479,8 +502,24 @@ public:
return Promise<_::UnfixVoid<T>>(false, kj::heap<ForkBranch<T>>(addRef(*this))); return Promise<_::UnfixVoid<T>>(false, kj::heap<ForkBranch<T>>(addRef(*this)));
} }
_::SplitTuplePromise<T> split() {
return splitImpl(MakeIndexes<tupleSize<T>()>());
}
private: private:
ExceptionOr<T> result; ExceptionOr<T> result;
template <size_t... indexes>
_::SplitTuplePromise<T> splitImpl(Indexes<indexes...>) {
return kj::tuple(addSplit<indexes>()...);
}
template <size_t index>
Promise<JoinPromises<typename SplitBranch<T, index>::Element>> addSplit() {
return Promise<JoinPromises<typename SplitBranch<T, index>::Element>>(
false, maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)),
implicitCast<typename SplitBranch<T, index>::Element*>(nullptr)));
}
}; };
inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
...@@ -833,6 +872,11 @@ Promise<T> ForkedPromise<T>::addBranch() { ...@@ -833,6 +872,11 @@ Promise<T> ForkedPromise<T>::addBranch() {
return hub->addBranch(); return hub->addBranch();
} }
template <typename T>
_::SplitTuplePromise<T> Promise<T>::split() {
return refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))->split();
}
template <typename T> template <typename T>
Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) { Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) {
return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node))); return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node)));
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#endif #endif
#include "exception.h" #include "exception.h"
#include "tuple.h"
namespace kj { namespace kj {
...@@ -86,6 +87,17 @@ using ReturnType = typename ReturnType_<Func, T>::Type; ...@@ -86,6 +87,17 @@ using ReturnType = typename ReturnType_<Func, T>::Type;
// The return type of functor Func given a parameter of type T, with the special exception that if // The return type of functor Func given a parameter of type T, with the special exception that if
// T is void, this is the return type of Func called with no arguments. // T is void, this is the return type of Func called with no arguments.
template <typename T> struct SplitTuplePromise_ { typedef Promise<T> Type; };
template <typename... T>
struct SplitTuplePromise_<kj::_::Tuple<T...>> {
typedef kj::Tuple<Promise<JoinPromises<T>>...> Type;
};
template <typename T>
using SplitTuplePromise = typename SplitTuplePromise_<T>::Type;
// T -> Promise<T>
// Tuple<T> -> Tuple<Promise<T>>
struct Void {}; struct Void {};
// Application code should NOT refer to this! See `kj::READY_NOW` instead. // Application code should NOT refer to this! See `kj::READY_NOW` instead.
......
...@@ -492,6 +492,21 @@ TEST(Async, ForkRef) { ...@@ -492,6 +492,21 @@ TEST(Async, ForkRef) {
EXPECT_EQ(789, branch2.wait(waitScope)); EXPECT_EQ(789, branch2.wait(waitScope));
} }
TEST(Async, Split) {
EventLoop loop;
WaitScope waitScope(loop);
Promise<Tuple<int, String, Promise<int>>> promise = evalLater([&]() {
return kj::tuple(123, str("foo"), Promise<int>(321));
});
Tuple<Promise<int>, Promise<String>, Promise<int>> split = promise.split();
EXPECT_EQ(123, get<0>(split).wait(waitScope));
EXPECT_EQ("foo", get<1>(split).wait(waitScope));
EXPECT_EQ(321, get<2>(split).wait(waitScope));
}
TEST(Async, ExclusiveJoin) { TEST(Async, ExclusiveJoin) {
{ {
EventLoop loop; EventLoop loop;
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include "async-prelude.h" #include "async-prelude.h"
#include "exception.h" #include "exception.h"
#include "refcount.h" #include "refcount.h"
#include "tuple.h"
namespace kj { namespace kj {
...@@ -242,6 +241,12 @@ public: ...@@ -242,6 +241,12 @@ public:
// `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same // `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
// (or an equivalent) object (probably implemented via reference counting). // (or an equivalent) object (probably implemented via reference counting).
_::SplitTuplePromise<T> split();
// Split a promise for a tuple into a tuple of promises.
//
// E.g. if you have `Promise<kj::Tuple<T, U>>`, `split()` returns
// `kj::Tuple<Promise<T>, Promise<U>>`.
Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT; Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT;
// Return a new promise that resolves when either the original promise resolves or `other` // Return a new promise that resolves when either the original promise resolves or `other`
// resolves (whichever comes first). The promise that didn't resolve first is canceled. // resolves (whichever comes first). The promise that didn't resolve first is canceled.
......
...@@ -350,6 +350,15 @@ inline auto apply(Func&& func, Params&&... params) ...@@ -350,6 +350,15 @@ inline auto apply(Func&& func, Params&&... params)
return _::expandAndApply(kj::fwd<Func>(func), kj::fwd<Params>(params)...); return _::expandAndApply(kj::fwd<Func>(func), kj::fwd<Params>(params)...);
} }
template <typename T> struct TupleSize_ { static constexpr size_t size = 1; };
template <typename... T> struct TupleSize_<_::Tuple<T...>> {
static constexpr size_t size = sizeof...(T);
};
template <typename T>
constexpr size_t tupleSize() { return TupleSize_<T>::size; }
// Returns size of the tuple T.
} // namespace kj } // namespace kj
#endif // KJ_TUPLE_H_ #endif // KJ_TUPLE_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment