Commit 7f7e1565 authored by Kenton Varda's avatar Kenton Varda

Automatically reduce `Promise<RemotePromise<T>>` -> `RemotePromise<T>`.

Just like `Promise<Promise<T>>` reduces to `Promise<T>`, we should be able to reduce `Promise<RemotePromise<T>>`. Moreover, this reduction should support pipelining, hence the reduced type should itself be `RemotePromise<T>`.

Fixes #341
parent 30c7d54b
......@@ -1031,6 +1031,55 @@ TEST(Capability, TransferCap) {
}).wait(waitScope);
}
KJ_TEST("Promise<RemotePromise<T>> automatically reduces to RemotePromise<T>") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
int callCount = 0;
test::TestInterface::Client client(kj::heap<TestInterfaceImpl>(callCount));
RemotePromise<test::TestInterface::FooResults> promise = kj::evalLater([&]() {
auto request = client.fooRequest();
request.setI(123);
request.setJ(true);
return request.send();
});
EXPECT_EQ(0, callCount);
auto response = promise.wait(waitScope);
EXPECT_EQ("foo", response.getX());
EXPECT_EQ(1, callCount);
}
KJ_TEST("Promise<RemotePromise<T>> automatically reduces to RemotePromise<T> with pipelining") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
int callCount = 0;
int chainedCallCount = 0;
test::TestPipeline::Client client(kj::heap<TestPipelineImpl>(callCount));
auto promise = kj::evalLater([&]() {
auto request = client.getCapRequest();
request.setN(234);
request.setInCap(test::TestInterface::Client(kj::heap<TestInterfaceImpl>(chainedCallCount)));
return request.send();
});
auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
pipelineRequest.setI(321);
auto pipelinePromise = pipelineRequest.send();
EXPECT_EQ(0, callCount);
EXPECT_EQ(0, chainedCallCount);
auto response = pipelinePromise.wait(waitScope);
EXPECT_EQ("bar", response.getX());
EXPECT_EQ(2, callCount);
EXPECT_EQ(1, chainedCallCount);
}
} // namespace
} // namespace _
} // namespace capnp
......@@ -60,6 +60,9 @@ public:
KJ_DISALLOW_COPY(RemotePromise);
RemotePromise(RemotePromise&& other) = default;
RemotePromise& operator=(RemotePromise&& other) = default;
static RemotePromise<T> reducePromise(kj::Promise<RemotePromise>&& promise);
// Hook for KJ so that Promise<RemotePromise<T>> automatically reduces to RemotePromise<T>.
};
class LocalClient;
......@@ -737,6 +740,19 @@ private:
// =======================================================================================
// Inline implementation details
template <typename T>
RemotePromise<T> RemotePromise<T>::reducePromise(kj::Promise<RemotePromise>&& promise) {
kj::Tuple<kj::Promise<Response<T>>, kj::Promise<kj::Own<PipelineHook>>> splitPromise =
promise.then([](RemotePromise&& inner) {
return kj::tuple(kj::Promise<Response<T>>(kj::mv(inner)),
PipelineHook::from(kj::mv(inner)));
}).split();
return RemotePromise(kj::mv(kj::get<0>(splitPromise)),
typename T::Pipeline(AnyPointer::Pipeline(
newLocalPromisePipeline(kj::mv(kj::get<1>(splitPromise))))));
}
template <typename Params, typename Results>
RemotePromise<Results> Request<Params, Results>::send() {
auto typelessPromise = hook->send();
......
......@@ -524,8 +524,8 @@ private:
}
template <size_t index>
Promise<JoinPromises<typename SplitBranch<T, index>::Element>> addSplit() {
return Promise<JoinPromises<typename SplitBranch<T, index>::Element>>(
ReducePromises<typename SplitBranch<T, index>::Element> addSplit() {
return ReducePromises<typename SplitBranch<T, index>::Element>(
false, maybeChain(kj::heap<SplitBranch<T, index>>(addRef(*this)),
implicitCast<typename SplitBranch<T, index>::Element*>(nullptr)));
}
......@@ -580,6 +580,16 @@ Own<PromiseNode>&& maybeChain(Own<PromiseNode>&& node, T*) {
return kj::mv(node);
}
template <typename T, typename Result = decltype(T::reducePromise(instance<Promise<T>>()))>
inline Result maybeReduce(Promise<T>&& promise, bool) {
return T::reducePromise(kj::mv(promise));
}
template <typename T>
inline Promise<T> maybeReduce(Promise<T>&& promise, ...) {
return kj::mv(promise);
}
// -------------------------------------------------------------------
class ExclusiveJoinPromiseNode final: public PromiseNode {
......@@ -809,8 +819,9 @@ PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler
Own<_::PromiseNode> intermediate =
heap<_::TransformPromiseNode<ResultT, _::FixVoid<T>, Func, ErrorFunc>>(
kj::mv(node), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler));
return PromiseForResult<Func, T>(false,
auto result = _::ChainPromises<_::ReturnType<Func, T>>(false,
_::maybeChain(kj::mv(intermediate), implicitCast<ResultT*>(nullptr)));
return _::maybeReduce(kj::mv(result), false);
}
namespace _ { // private
......@@ -1108,7 +1119,7 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() {
Own<_::PromiseNode> intermediate(
heap<_::AdapterPromiseNode<_::FixVoid<T>, _::PromiseAndFulfillerAdapter<T>>>(*wrapper));
Promise<_::JoinPromises<T>> promise(false,
_::ReducePromises<T> promise(false,
_::maybeChain(kj::mv(intermediate), implicitCast<T*>(nullptr)));
return PromiseFulfillerPair<T> { kj::mv(promise), kj::mv(wrapper) };
......
......@@ -45,15 +45,28 @@ Promise<void> joinPromises(Array<Promise<void>>&& promises);
namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; };
template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T>
Promise<T> chainPromiseType(T*);
template <typename T>
Promise<T> chainPromiseType(Promise<T>*);
template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
//
// TODO(cleanup): Rename to avoid confusion with joinPromises() call which is completely
// unrelated.
using ChainPromises = decltype(chainPromiseType((T*)nullptr));
// Constructs a promise for T, reducing double-promises. That is, if T is Promise<U>, resolves to
// Promise<U>, otherwise resolves to Promise<T>.
template <typename T>
Promise<T> reducePromiseType(T*, ...);
template <typename T>
Promise<T> reducePromiseType(Promise<T>*, ...);
template <typename T, typename Reduced = decltype(T::reducePromise(kj::instance<Promise<T>>()))>
Reduced reducePromiseType(T*, bool);
template <typename T>
using ReducePromises = decltype(reducePromiseType((T*)nullptr, false));
// Like ChainPromises, but also takes into account whether T has a method `reducePromise` that
// reduces Promise<T> to something else. In particular this allows Promise<capnp::RemotePromise<U>>
// to reduce to capnp::RemotePromise<U>.
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
......@@ -90,7 +103,7 @@ using ReturnType = typename ReturnType_<Func, T>::Type;
template <typename T> struct SplitTuplePromise_ { typedef Promise<T> Type; };
template <typename... T>
struct SplitTuplePromise_<kj::_::Tuple<T...>> {
typedef kj::Tuple<Promise<JoinPromises<T>>...> Type;
typedef kj::Tuple<ReducePromises<T>...> Type;
};
template <typename T>
......
......@@ -44,7 +44,7 @@ template <typename T>
struct PromiseFulfillerPair;
template <typename Func, typename T>
using PromiseForResult = Promise<_::JoinPromises<_::ReturnType<Func, T>>>;
using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>;
// Evaluates to the type of Promise for the result of calling functor type Func with parameter type
// T. If T is void, then the promise is for the result of calling Func with no arguments. If
// Func itself returns a promise, the promises are joined, so you never get Promise<Promise<T>>.
......@@ -482,7 +482,7 @@ Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams);
template <typename T>
struct PromiseFulfillerPair {
Promise<_::JoinPromises<T>> promise;
_::ReducePromises<T> promise;
Own<PromiseFulfiller<T>> fulfiller;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment