Commit 4e7967a9 authored by Kenton Varda's avatar Kenton Varda

Make all Promise methods consistently consume the promise (returning a new…

Make all Promise methods consistently consume the promise (returning a new promise when it makes sense), rename daemonize -> detach, and make eagerlyEvaluate() require an error handler (this caught several places where I forgot to use one).
parent 8d5f8f9c
......@@ -196,8 +196,7 @@ TEST(Capability, AsyncCancelation) {
auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false;
auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate();
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
int callCount = 0;
......@@ -212,8 +211,7 @@ TEST(Capability, AsyncCancelation) {
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) {
returned = true;
});
promise.eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
......@@ -249,8 +247,7 @@ TEST(Capability, SyncCancelation) {
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) {
returned = true;
});
promise.eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
......
......@@ -203,11 +203,10 @@ public:
// We daemonize one branch, but only after joining it with the promise that fires if
// cancellation is allowed.
auto daemonPromise = forked.addBranch();
daemonPromise.attach(kj::addRef(*context));
daemonPromise.exclusiveJoin(kj::mv(cancelPaf.promise));
// Daemonize, ignoring exceptions.
daemonPromise.daemonize([](kj::Exception&&) {});
forked.addBranch()
.attach(kj::addRef(*context))
.exclusiveJoin(kj::mv(cancelPaf.promise))
.detach([](kj::Exception&&) {}); // ignore exceptions
// Now the other branch returns the response from the context.
auto contextPtr = context.get();
......@@ -218,7 +217,7 @@ public:
// We also want to notify the context that cancellation was requested if this branch is
// destroyed.
promise.attach(LocalCallContext::Canceler(kj::mv(context)));
promise = promise.attach(LocalCallContext::Canceler(kj::mv(context)));
// We return the other branch.
return RemotePromise<ObjectPointer>(
......@@ -250,12 +249,11 @@ class QueuedPipeline final: public PipelineHook, public kj::Refcounted {
public:
QueuedPipeline(kj::Promise<kj::Own<PipelineHook>>&& promiseParam)
: promise(promiseParam.fork()),
selfResolutionOp(promise.addBranch().then(
[this](kj::Own<PipelineHook>&& inner) {
redirect = kj::mv(inner);
})) {
selfResolutionOp.eagerlyEvaluate();
}
selfResolutionOp(promise.addBranch().then([this](kj::Own<PipelineHook>&& inner) {
redirect = kj::mv(inner);
}, [this](kj::Exception&& exception) {
redirect = newBrokenPipeline(kj::mv(exception));
}).eagerlyEvaluate(nullptr)) {}
kj::Own<PipelineHook> addRef() override {
return kj::addRef(*this);
......@@ -288,14 +286,13 @@ class QueuedClient final: public ClientHook, public kj::Refcounted {
public:
QueuedClient(kj::Promise<kj::Own<ClientHook>>&& promiseParam)
: promise(promiseParam.fork()),
selfResolutionOp(promise.addBranch().then(
[this](kj::Own<ClientHook>&& inner) {
redirect = kj::mv(inner);
})),
selfResolutionOp(promise.addBranch().then([this](kj::Own<ClientHook>&& inner) {
redirect = kj::mv(inner);
}, [this](kj::Exception&& exception) {
redirect = newBrokenCap(kj::mv(exception));
}).eagerlyEvaluate(nullptr)),
promiseForCallForwarding(promise.addBranch().fork()),
promiseForClientResolution(promise.addBranch().fork()) {
selfResolutionOp.eagerlyEvaluate();
}
promiseForClientResolution(promise.addBranch().fork()) {}
Request<ObjectPointer, ObjectPointer> newCall(
uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) override {
......@@ -470,10 +467,7 @@ public:
auto promise = kj::evalLater([this,interfaceId,methodId,contextPtr]() {
return server->dispatchCall(interfaceId, methodId,
CallContext<ObjectPointer, ObjectPointer>(*contextPtr));
});
// Make sure that this client cannot be destroyed until the promise completes.
promise.attach(kj::addRef(*this));
}).attach(kj::addRef(*this));
// We have to fork this promise for the pipeline to receive a copy of the answer.
auto forked = promise.fork();
......@@ -488,10 +482,9 @@ public:
return kj::mv(pipeline.hook);
});
pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
pipelinePromise = pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
auto completionPromise = forked.addBranch();
completionPromise.attach(kj::mv(context));
auto completionPromise = forked.addBranch().attach(kj::mv(context));
return VoidPromiseAndPipeline { kj::mv(completionPromise),
kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise)) };
......
......@@ -240,9 +240,7 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
// Arrange to destroy the server context when all references are gone, or when the
// EzRpcServer is destroyed (which will destroy the TaskSet).
auto promise = server->network.onDrained();
promise.attach(kj::mv(server));
tasks.add(kj::mv(promise));
tasks.add(server->network.onDrained().attach(kj::mv(server)));
})));
}
......
......@@ -575,8 +575,7 @@ TEST(Rpc, AsyncCancelation) {
auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false;
auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate();
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
.castAs<test::TestMoreStuff>();
......@@ -590,8 +589,7 @@ TEST(Rpc, AsyncCancelation) {
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) {
returned = true;
});
promise.eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
......@@ -630,8 +628,7 @@ TEST(Rpc, SyncCancelation) {
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) {
returned = true;
});
promise.eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
......@@ -698,8 +695,7 @@ TEST(Rpc, RetainAndRelease) {
auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false;
auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate();
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
{
auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
......@@ -765,8 +761,7 @@ TEST(Rpc, Cancel) {
auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false;
auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate();
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
{
auto request = client.neverReturnRequest();
......@@ -797,8 +792,7 @@ TEST(Rpc, SendTwice) {
auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false;
auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate();
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
auto cap = test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
......
......@@ -83,11 +83,9 @@ public:
}, [&](kj::Exception&& exception) {
// Exception during write!
network.disconnectFulfiller->fulfill();
});
promise.eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
return kj::mv(promise);
});
network.previousWrite.attach(kj::addRef(*this));
}).attach(kj::addRef(*this));
}
private:
......
......@@ -273,7 +273,7 @@ public:
auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
question.selfRef = *questionRef;
paf.promise.attach(kj::addRef(*questionRef));
paf.promise = paf.promise.attach(kj::addRef(*questionRef));
{
auto message = connection->newOutgoingMessage(
......@@ -865,6 +865,10 @@ private:
resolve(kj::mv(resolution));
}, [this](kj::Exception&& exception) {
resolve(newBrokenCap(kj::mv(exception)));
}).eagerlyEvaluate([&](kj::Exception&& e) {
// Make any exceptions thrown from resolve() go to the connection's TaskSet which
// will cause the connection to be terminated.
connectionState.tasks.add(kj::mv(e));
})) {
// Create a client that starts out forwarding all calls to `initial` but, once `eventual`
// resolves, will forward there instead. In addition, `whenMoreResolved()` will return a fork
......@@ -872,13 +876,6 @@ private:
// the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
// anything that needs to stay alive in order to resolve it correctly (such as making sure the
// import ID is not released).
// Make any exceptions thrown from resolveSelfPromise go to the connection's TaskSet which
// will cause the connection to be terminated.
resolveSelfPromise = resolveSelfPromise.then(
[]() {}, [&](kj::Exception&& e) { connectionState.tasks.add(kj::mv(e)); });
resolveSelfPromise.eagerlyEvaluate();
}
~PromiseClient() noexcept(false) {
......@@ -1080,7 +1077,7 @@ private:
// to eventually resolve to the ClientHook produced by `promise`. This method waits for that
// resolve to happen and then sends the appropriate `Resolve` message to the peer.
auto result = promise.then(
return promise.then(
[this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
// Successful resolution.
......@@ -1136,9 +1133,10 @@ private:
resolve.setPromiseId(exportId);
fromException(exception, resolve.initException());
message->send();
}).eagerlyEvaluate([this](kj::Exception&& exception) {
// Put the exception on the TaskSet which will cause the connection to be terminated.
tasks.add(kj::mv(exception));
});
result.eagerlyEvaluate();
return kj::mv(result);
}
// =====================================================================================
......@@ -1296,7 +1294,7 @@ private:
auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
import.promiseFulfiller = kj::mv(paf.fulfiller);
paf.promise.attach(kj::addRef(*importClient));
paf.promise = paf.promise.attach(kj::addRef(*importClient));
result = kj::refcounted<PromiseClient>(
connectionState, kj::mv(importClient), kj::mv(paf.promise), importId);
} else {
......@@ -1683,8 +1681,7 @@ private:
message->send();
result.promise = kj::mv(paf.promise);
result.promise.attach(kj::addRef(*result.questionRef));
result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
return kj::mv(result);
}
......@@ -1701,10 +1698,13 @@ private:
resolve(kj::mv(response));
}, [this](kj::Exception&& exception) {
resolve(kj::mv(exception));
}).eagerlyEvaluate([&](kj::Exception&& e) {
// Make any exceptions thrown from resolve() go to the connection's TaskSet which
// will cause the connection to be terminated.
connectionState.tasks.add(kj::mv(e));
})) {
// Construct a new RpcPipeline.
resolveSelfPromise.eagerlyEvaluate();
state.init<Waiting>(kj::mv(questionRef));
}
......@@ -2347,15 +2347,15 @@ private:
auto forked = resultsPromise.fork();
answer.redirectedResults = forked.addBranch();
auto promise = kj::mv(cancelPaf.promise);
promise.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}));
promise.daemonize([](kj::Exception&&) {});
cancelPaf.promise
.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
.detach([](kj::Exception&&) {});
} else {
// Hack: Both the success and error continuations need to use the context. We could
// refcount, but both will be destroyed at the same time anyway.
RpcCallContext* contextPtr = context;
auto promise = promiseAndPipeline.promise.then(
promiseAndPipeline.promise.then(
[contextPtr]() {
contextPtr->sendReturn();
}, [contextPtr](kj::Exception&& exception) {
......@@ -2363,10 +2363,9 @@ private:
}).then([]() {}, [&](kj::Exception&& exception) {
// Handle exceptions that occur in sendReturn()/sendErrorReturn().
taskFailed(kj::mv(exception));
});
promise.attach(kj::mv(context));
promise.exclusiveJoin(kj::mv(cancelPaf.promise));
promise.daemonize([](kj::Exception&&) {});
}).attach(kj::mv(context))
.exclusiveJoin(kj::mv(cancelPaf.promise))
.detach([](kj::Exception&&) {});
}
}
}
......
......@@ -1015,7 +1015,7 @@ kj::Promise<void> TestMoreStuffImpl::neverReturnAdvanced(
neverFulfill = kj::mv(paf.fulfiller);
// Attach `cap` to the promise to make sure it is released.
paf.promise.attach(context.getParams().getCap());
paf.promise = paf.promise.attach(context.getParams().getCap());
// Also attach `cap` to the result struct to make sure that is released.
context.getResults().setCapCopy(context.getParams().getCap());
......
......@@ -617,20 +617,34 @@ Promise<T> ForkedPromise<T>::addBranch() {
}
template <typename T>
void Promise<T>::exclusiveJoin(Promise<T>&& other) {
node = heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node));
Promise<T> Promise<T>::exclusiveJoin(Promise<T>&& other) {
return Promise(false, heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node)));
}
template <typename T>
template <typename... Attachments>
void Promise<T>::attach(Attachments&&... attachments) {
node = kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...));
Promise<T> Promise<T>::attach(Attachments&&... attachments) {
return Promise(false, kj::heap<_::AttachmentPromiseNode<Tuple<Attachments...>>>(
kj::mv(node), kj::tuple(kj::fwd<Attachments>(attachments)...)));
}
template <typename T>
void Promise<T>::eagerlyEvaluate() {
node = _::spark<_::FixVoid<T>>(kj::mv(node));
template <typename ErrorFunc>
Promise<T> Promise<T>::eagerlyEvaluate(ErrorFunc&& errorHandler) {
return Promise(false, _::spark<_::FixVoid<T>>(
then([](T&& value) -> T { return kj::mv(value); }, kj::fwd<ErrorFunc>(errorHandler)).node));
}
template <>
template <typename ErrorFunc>
Promise<void> Promise<void>::eagerlyEvaluate(ErrorFunc&& errorHandler) {
return Promise(false, _::spark<_::Void>(
then([]() {}, kj::fwd<ErrorFunc>(errorHandler)).node));
}
template <typename T>
Promise<T> Promise<T>::eagerlyEvaluate(decltype(nullptr)) {
return Promise(false, _::spark<_::FixVoid<T>>(kj::mv(node)));
}
template <typename T>
......@@ -645,14 +659,14 @@ inline PromiseForResult<Func, void> evalLater(Func&& func) {
template <typename T>
template <typename ErrorFunc>
void Promise<T>::daemonize(ErrorFunc&& errorHandler) {
return _::daemonize(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
void Promise<T>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([](T&&) {}, kj::fwd<ErrorFunc>(errorHandler)));
}
template <>
template <typename ErrorFunc>
void Promise<void>::daemonize(ErrorFunc&& errorHandler) {
return _::daemonize(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
void Promise<void>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
}
// =======================================================================================
......
......@@ -48,7 +48,7 @@ TEST(AsyncIo, SimpleNetwork) {
}).then([&](Own<AsyncIoStream>&& result) {
client = kj::mv(result);
return client->write("foo", 3);
}).daemonize([](kj::Exception&& exception) {
}).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
......@@ -102,7 +102,7 @@ TEST(AsyncIo, OneWayPipe) {
auto pipe = ioContext.provider->newOneWayPipe();
char receiveBuffer[4];
pipe.out->write("foo", 3).daemonize([](kj::Exception&& exception) {
pipe.out->write("foo", 3).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
......
......@@ -649,9 +649,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost(
}));
auto reader = heap<LookupReader>(kj::mv(thread), kj::mv(input));
auto result = reader->read();
result.attach(kj::mv(reader));
return kj::mv(result);
return reader->read().attach(kj::mv(reader));
}
// =======================================================================================
......
......@@ -173,7 +173,7 @@ private:
friend class TaskSetImpl;
};
void daemonize(kj::Promise<void>&& promise);
void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
Promise<void> yield();
......
......@@ -259,8 +259,7 @@ TEST(Async, Ordering) {
auto paf = kj::newPromiseAndFulfiller<void>();
promises[2] = paf.promise.then([&]() {
EXPECT_EQ(1, counter++);
});
promises[2].eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
paf.fulfiller->fulfill();
}
......@@ -270,25 +269,21 @@ TEST(Async, Ordering) {
EXPECT_EQ(4, counter++);
}).then([&]() {
EXPECT_EQ(5, counter++);
});
promises[3].eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
{
auto paf = kj::newPromiseAndFulfiller<void>();
promises[4] = paf.promise.then([&]() {
EXPECT_EQ(2, counter++);
});
promises[4].eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
paf.fulfiller->fulfill();
}
// evalLater() is like READY_NOW.then().
promises[5] = evalLater([&]() {
EXPECT_EQ(6, counter++);
});
promises[5].eagerlyEvaluate();
});
promises[1].eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
}).eagerlyEvaluate(nullptr);
promises[0] = evalLater([&]() {
EXPECT_EQ(3, counter++);
......@@ -296,8 +291,7 @@ TEST(Async, Ordering) {
// Making this a chain should NOT cause it to preempt promises[1]. (This was a problem at one
// point.)
return Promise<void>(READY_NOW);
});
promises[0].eagerlyEvaluate();
}).eagerlyEvaluate(nullptr);
for (auto i: indices(promises)) {
kj::mv(promises[i]).wait(waitScope);
......@@ -372,9 +366,7 @@ TEST(Async, ExclusiveJoin) {
auto left = evalLater([&]() { return 123; });
auto right = newPromiseAndFulfiller<int>(); // never fulfilled
left.exclusiveJoin(kj::mv(right.promise));
EXPECT_EQ(123, left.wait(waitScope));
EXPECT_EQ(123, left.exclusiveJoin(kj::mv(right.promise)).wait(waitScope));
}
{
......@@ -384,9 +376,7 @@ TEST(Async, ExclusiveJoin) {
auto left = newPromiseAndFulfiller<int>(); // never fulfilled
auto right = evalLater([&]() { return 123; });
left.promise.exclusiveJoin(kj::mv(right));
EXPECT_EQ(123, left.promise.wait(waitScope));
EXPECT_EQ(123, left.promise.exclusiveJoin(kj::mv(right)).wait(waitScope));
}
{
......@@ -396,9 +386,7 @@ TEST(Async, ExclusiveJoin) {
auto left = evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 456; });
left.exclusiveJoin(kj::mv(right));
EXPECT_EQ(123, left.wait(waitScope));
EXPECT_EQ(123, left.exclusiveJoin(kj::mv(right)).wait(waitScope));
}
{
......@@ -406,13 +394,9 @@ TEST(Async, ExclusiveJoin) {
WaitScope waitScope(loop);
auto left = evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 456; });
right.eagerlyEvaluate();
auto right = evalLater([&]() { return 456; }).eagerlyEvaluate(nullptr);
left.exclusiveJoin(kj::mv(right));
EXPECT_EQ(456, left.wait(waitScope));
EXPECT_EQ(456, left.exclusiveJoin(kj::mv(right)).wait(waitScope));
}
}
......@@ -474,9 +458,7 @@ TEST(Async, Attach) {
Promise<int> promise = evalLater([&]() {
EXPECT_FALSE(destroyed);
return 123;
});
promise.attach(kj::heap<DestructorDetector>(destroyed));
}).attach(kj::heap<DestructorDetector>(destroyed));
promise = promise.then([&](int i) {
EXPECT_TRUE(destroyed);
......@@ -501,14 +483,14 @@ TEST(Async, EagerlyEvaluate) {
EXPECT_FALSE(called);
promise.eagerlyEvaluate();
promise = promise.eagerlyEvaluate(nullptr);
evalLater([]() {}).wait(waitScope);
EXPECT_TRUE(called);
}
TEST(Async, Daemonize) {
TEST(Async, Detach) {
EventLoop loop;
WaitScope waitScope(loop);
......@@ -517,8 +499,8 @@ TEST(Async, Daemonize) {
bool ran3 = false;
evalLater([&]() { ran1 = true; });
evalLater([&]() { ran2 = true; }).daemonize([](kj::Exception&&) { ADD_FAILURE(); });
evalLater([]() { KJ_FAIL_ASSERT("foo"); }).daemonize([&](kj::Exception&& e) { ran3 = true; });
evalLater([&]() { ran2 = true; }).detach([](kj::Exception&&) { ADD_FAILURE(); });
evalLater([]() { KJ_FAIL_ASSERT("foo"); }).detach([&](kj::Exception&& e) { ran3 = true; });
EXPECT_FALSE(ran1);
EXPECT_FALSE(ran2);
......@@ -553,8 +535,7 @@ TEST(Async, SetRunnable) {
EXPECT_EQ(0, port.callCount);
{
auto promise = evalLater([]() {});
promise.eagerlyEvaluate();
auto promise = evalLater([]() {}).eagerlyEvaluate(nullptr);
EXPECT_TRUE(port.runnable);
loop.run(1);
......@@ -568,12 +549,10 @@ TEST(Async, SetRunnable) {
{
auto paf = newPromiseAndFulfiller<void>();
auto promise = paf.promise.then([]() {});
promise.eagerlyEvaluate();
auto promise = paf.promise.then([]() {}).eagerlyEvaluate(nullptr);
EXPECT_FALSE(port.runnable);
auto promise2 = evalLater([]() {});
promise2.eagerlyEvaluate();
auto promise2 = evalLater([]() {}).eagerlyEvaluate(nullptr);
paf.fulfiller->fulfill();
EXPECT_TRUE(port.runnable);
......
......@@ -92,7 +92,7 @@ TEST_F(AsyncUnixTest, SignalsMultiListen) {
port.onSignal(SIGIO).then([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal.";
}).daemonize([](kj::Exception&& exception) {
}).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
......@@ -152,12 +152,12 @@ TEST_F(AsyncUnixTest, SignalsNoWait) {
receivedSigusr2 = true;
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}).daemonize([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
port.onSignal(SIGIO).then([&](siginfo_t&& info) {
receivedSigio = true;
EXPECT_EQ(SIGIO, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}).daemonize([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
kill(getpid(), SIGUSR2);
kill(getpid(), SIGIO);
......@@ -205,7 +205,7 @@ TEST_F(AsyncUnixTest, PollMultiListen) {
port.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) {
ADD_FAILURE() << "Received wrong poll.";
}).daemonize([](kj::Exception&& exception) {
}).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
......@@ -273,11 +273,11 @@ TEST_F(AsyncUnixTest, PollNoWait) {
port.onFdEvent(pipefds[0], POLLIN | POLLPRI).then([&](short&& events) {
receivedCount++;
EXPECT_EQ(POLLIN, events);
}).daemonize([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
port.onFdEvent(pipefds2[0], POLLIN | POLLPRI).then([&](short&& events) {
receivedCount++;
EXPECT_EQ(POLLIN, events);
}).daemonize([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
KJ_SYSCALL(write(pipefds[1], "foo", 3));
KJ_SYSCALL(write(pipefds2[1], "bar", 3));
......
......@@ -331,7 +331,7 @@ Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
void daemonize(kj::Promise<void>&& promise) {
void detach(kj::Promise<void>&& promise) {
EventLoop& loop = currentEventLoop();
KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
loop.daemons->add(kj::mv(promise));
......
......@@ -66,7 +66,11 @@ class Promise: protected _::PromiseBase {
//
// Promises are linear types -- they are moveable but not copyable. If a Promise is destroyed
// or goes out of scope (without being moved elsewhere), any ongoing asynchronous operations
// meant to fulfill the promise will be canceled if possible.
// meant to fulfill the promise will be canceled if possible. All methods of `Promise` (unless
// otherwise noted) actually consume the promise in the sense of move semantics. (Arguably they
// should be rvalue-qualified, but at the time this interface was created compilers didn't widely
// support that yet and anyway it would be pretty ugly typing kj::mv(promise).whatever().) If
// you want to use one Promise in two different places, you must fork it with `fork()`.
//
// To use the result of a Promise, you must call `then()` and supply a callback function to
// call with the result. `then()` returns another promise, for the result of the callback.
......@@ -161,8 +165,9 @@ public:
// If the returned promise is destroyed before the callback runs, the callback will be canceled
// (it will never run).
//
// Note that `then()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid, but `then()` returns a new promise.
// Note that `then()` -- like all other Promise methods -- consumes the promise on which it is
// called, in the sense of move semantics. After returning, the original promise is no longer
// valid, but `then()` returns a new promise.
//
// *Advanced implementation tips:* Most users will never need to worry about the below, but
// it is good to be aware of.
......@@ -216,9 +221,6 @@ public:
// around them in arbitrary ways. Therefore, callers really need to know if a function they
// are calling might wait(), and the `WaitScope&` parameter makes this clear.
//
// Note that `wait()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
......@@ -227,31 +229,36 @@ public:
// `T` must be copy-constructable for this to work. Or, in the special case where `T` is
// `Own<U>`, `U` must have a method `Own<U> addRef()` which returns a new reference to the same
// (or an equivalent) object (probably implemented via reference counting).
//
// Note that `fork()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the original promise is no longer valid.
void exclusiveJoin(Promise<T>&& other);
// Replace this promise with one that resolves when either the original promise resolves or
// `other` resolves (whichever comes first). The promise that didn't resolve first is canceled.
Promise<T> exclusiveJoin(Promise<T>&& other) KJ_WARN_UNUSED_RESULT;
// Return a new promise that resolves when either the original promise resolves or `other`
// resolves (whichever comes first). The promise that didn't resolve first is canceled.
// TODO(someday): inclusiveJoin(), or perhaps just join(), which waits for both completions
// and produces a tuple?
template <typename... Attachments>
void attach(Attachments&&... attachments);
Promise<T> attach(Attachments&&... attachments) KJ_WARN_UNUSED_RESULT;
// "Attaches" one or more movable objects (often, Own<T>s) to the promise, such that they will
// be destroyed when the promise resolves. This is useful when a promise's callback contains
// pointers into some object and you want to make sure the object still exists when the callback
// runs -- after calling then(), use attach() to add necessary objects to the result.
void eagerlyEvaluate();
template <typename ErrorFunc>
Promise<T> eagerlyEvaluate(ErrorFunc&& errorHandler) KJ_WARN_UNUSED_RESULT;
Promise<T> eagerlyEvaluate(decltype(nullptr)) KJ_WARN_UNUSED_RESULT;
// Force eager evaluation of this promise. Use this if you are going to hold on to the promise
// for awhile without consuming the result, but you want to make sure that the system actually
// processes it.
//
// `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to
// `then()`, except that it must return void. We make you specify this because otherwise it's
// easy to forget to handle errors in a promise that you never use. You may specify nullptr for
// the error handler if you are sure that ignoring errors is fine, or if you know that you'll
// eventually wait on the promise somewhere.
template <typename ErrorFunc>
void daemonize(ErrorFunc&& errorHandler);
void detach(ErrorFunc&& errorHandler);
// Allows the promise to continue running in the background until it completes or the
// `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
// promise, you need to make sure that the promise owns all the objects it touches or make sure
......@@ -262,12 +269,10 @@ public:
//
// This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
// canceled unless the callee explicitly permits it.
//
// Note that `daemonize()` consumes the promise on which it is called, in the sense of move
// semantics. After returning, the original promise is no longer valid.
kj::String trace();
// Returns a dump of debug info about this promise. Not for production use. Requires RTTI.
// This method does NOT consume the promise as other methods do.
private:
Promise(bool, Own<_::PromiseNode>&& node): PromiseBase(kj::mv(node)) {}
......@@ -462,7 +467,7 @@ class TaskSet {
//
// This is useful for "daemon" objects that perform background tasks which aren't intended to
// fulfill any particular external promise, but which may need to be canceled (and thus can't
// use `Promise::daemonize()`). The daemon object holds a TaskSet to collect these tasks it is
// use `Promise::detach()`). The daemon object holds a TaskSet to collect these tasks it is
// working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed as well,
// and everything the daemon is doing is canceled.
......@@ -593,7 +598,7 @@ private:
void enterScope();
void leaveScope();
friend void _::daemonize(kj::Promise<void>&& promise);
friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend class _::Event;
......
linux-gcc-4.7 1771 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1774 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1794 ./super-test.sh tmpdir capnp-clang quick clang
linux-gcc-4.7 1773 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1776 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1796 ./super-test.sh tmpdir capnp-clang quick clang
mac 807 ./super-test.sh remote beat caffeinate quick
cygwin 812 ./super-test.sh remote Kenton@flashman quick
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment