Commit 4158ca9f authored by Kenton Varda's avatar Kenton Varda

Remove most of EventLoop interface in favor of equivalent Promise methods.

parent 6def52fd
...@@ -67,7 +67,7 @@ TEST(Capability, Basic) { ...@@ -67,7 +67,7 @@ TEST(Capability, Basic) {
bool barFailed = false; bool barFailed = false;
auto request3 = client.barRequest(); auto request3 = client.barRequest();
auto promise3 = loop.there(request3.send(), auto promise3 = request3.send().then(
[](Response<test::TestInterface::BarResults>&& response) { [](Response<test::TestInterface::BarResults>&& response) {
ADD_FAILURE() << "Expected bar() call to fail."; ADD_FAILURE() << "Expected bar() call to fail.";
}, [&](kj::Exception&& e) { }, [&](kj::Exception&& e) {
...@@ -76,13 +76,13 @@ TEST(Capability, Basic) { ...@@ -76,13 +76,13 @@ TEST(Capability, Basic) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("foo", response1.getX()); EXPECT_EQ("foo", response1.getX());
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
loop.wait(kj::mv(promise3)); promise3.wait();
EXPECT_EQ(2, callCount); EXPECT_EQ(2, callCount);
EXPECT_TRUE(barFailed); EXPECT_TRUE(barFailed);
...@@ -105,11 +105,11 @@ TEST(Capability, Inheritance) { ...@@ -105,11 +105,11 @@ TEST(Capability, Inheritance) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("bar", response1.getX()); EXPECT_EQ("bar", response1.getX());
...@@ -141,10 +141,10 @@ TEST(Capability, Pipelining) { ...@@ -141,10 +141,10 @@ TEST(Capability, Pipelining) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
EXPECT_EQ(0, chainedCallCount); EXPECT_EQ(0, chainedCallCount);
auto response = loop.wait(kj::mv(pipelinePromise)); auto response = pipelinePromise.wait();
EXPECT_EQ("bar", response.getX()); EXPECT_EQ("bar", response.getX());
auto response2 = loop.wait(kj::mv(pipelinePromise2)); auto response2 = pipelinePromise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
EXPECT_EQ(3, callCount); EXPECT_EQ(3, callCount);
...@@ -168,7 +168,7 @@ TEST(Capability, TailCall) { ...@@ -168,7 +168,7 @@ TEST(Capability, TailCall) {
auto dependentCall0 = promise.getC().getCallSequenceRequest().send(); auto dependentCall0 = promise.getC().getCallSequenceRequest().send();
auto response = loop.wait(kj::mv(promise)); auto response = promise.wait();
EXPECT_EQ(456, response.getI()); EXPECT_EQ(456, response.getI());
EXPECT_EQ(456, response.getI()); EXPECT_EQ(456, response.getI());
...@@ -176,9 +176,9 @@ TEST(Capability, TailCall) { ...@@ -176,9 +176,9 @@ TEST(Capability, TailCall) {
auto dependentCall2 = response.getC().getCallSequenceRequest().send(); auto dependentCall2 = response.getC().getCallSequenceRequest().send();
EXPECT_EQ(0, loop.wait(kj::mv(dependentCall0)).getN()); EXPECT_EQ(0, dependentCall0.wait().getN());
EXPECT_EQ(1, loop.wait(kj::mv(dependentCall1)).getN()); EXPECT_EQ(1, dependentCall1.wait().getN());
EXPECT_EQ(2, loop.wait(kj::mv(dependentCall2)).getN()); EXPECT_EQ(2, dependentCall2.wait().getN());
EXPECT_EQ(1, calleeCallCount); EXPECT_EQ(1, calleeCallCount);
EXPECT_EQ(1, callerCallCount); EXPECT_EQ(1, callerCallCount);
...@@ -191,7 +191,7 @@ TEST(Capability, AsyncCancelation) { ...@@ -191,7 +191,7 @@ TEST(Capability, AsyncCancelation) {
auto paf = kj::newPromiseAndFulfiller<void>(); auto paf = kj::newPromiseAndFulfiller<void>();
bool destroyed = false; bool destroyed = false;
auto destructionPromise = loop.there(kj::mv(paf.promise), [&]() { destroyed = true; }); auto destructionPromise = paf.promise.then([&]() { destroyed = true; });
destructionPromise.eagerlyEvaluate(); destructionPromise.eagerlyEvaluate();
int callCount = 0; int callCount = 0;
...@@ -204,21 +204,21 @@ TEST(Capability, AsyncCancelation) { ...@@ -204,21 +204,21 @@ TEST(Capability, AsyncCancelation) {
{ {
auto request = client.expectAsyncCancelRequest(); auto request = client.expectAsyncCancelRequest();
request.setCap(test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)))); request.setCap(test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))));
promise = loop.there(request.send(), promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) { [&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) {
returned = true; returned = true;
}); });
promise.eagerlyEvaluate(); promise.eagerlyEvaluate();
} }
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
// We can detect that the method was canceled because it will drop the cap. // We can detect that the method was canceled because it will drop the cap.
EXPECT_FALSE(destroyed); EXPECT_FALSE(destroyed);
EXPECT_FALSE(returned); EXPECT_FALSE(returned);
promise = nullptr; // request cancellation promise = nullptr; // request cancellation
loop.wait(kj::mv(destructionPromise)); destructionPromise.wait();
EXPECT_TRUE(destroyed); EXPECT_TRUE(destroyed);
EXPECT_FALSE(returned); EXPECT_FALSE(returned);
...@@ -240,16 +240,16 @@ TEST(Capability, SyncCancelation) { ...@@ -240,16 +240,16 @@ TEST(Capability, SyncCancelation) {
{ {
auto request = client.expectSyncCancelRequest(); auto request = client.expectSyncCancelRequest();
request.setCap(test::TestInterface::Client(kj::heap<TestInterfaceImpl>(innerCallCount))); request.setCap(test::TestInterface::Client(kj::heap<TestInterfaceImpl>(innerCallCount)));
promise = loop.there(request.send(), promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) { [&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) {
returned = true; returned = true;
}); });
promise.eagerlyEvaluate(); promise.eagerlyEvaluate();
} }
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
// expectSyncCancel() will make a call to the TestInterfaceImpl only once it noticed isCanceled() // expectSyncCancel() will make a call to the TestInterfaceImpl only once it noticed isCanceled()
// is true. // is true.
...@@ -257,10 +257,10 @@ TEST(Capability, SyncCancelation) { ...@@ -257,10 +257,10 @@ TEST(Capability, SyncCancelation) {
EXPECT_FALSE(returned); EXPECT_FALSE(returned);
promise = nullptr; // request cancellation promise = nullptr; // request cancellation
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
loop.wait(loop.evalLater([]() {})); kj::evalLater([]() {}).wait();
EXPECT_EQ(1, innerCallCount); EXPECT_EQ(1, innerCallCount);
EXPECT_FALSE(returned); EXPECT_FALSE(returned);
...@@ -286,7 +286,7 @@ TEST(Capability, DynamicClient) { ...@@ -286,7 +286,7 @@ TEST(Capability, DynamicClient) {
bool barFailed = false; bool barFailed = false;
auto request3 = client.newRequest("bar"); auto request3 = client.newRequest("bar");
auto promise3 = loop.there(request3.send(), auto promise3 = request3.send().then(
[](Response<DynamicStruct>&& response) { [](Response<DynamicStruct>&& response) {
ADD_FAILURE() << "Expected bar() call to fail."; ADD_FAILURE() << "Expected bar() call to fail.";
}, [&](kj::Exception&& e) { }, [&](kj::Exception&& e) {
...@@ -295,13 +295,13 @@ TEST(Capability, DynamicClient) { ...@@ -295,13 +295,13 @@ TEST(Capability, DynamicClient) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("foo", response1.get("x").as<Text>()); EXPECT_EQ("foo", response1.get("x").as<Text>());
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
loop.wait(kj::mv(promise3)); promise3.wait();
EXPECT_EQ(2, callCount); EXPECT_EQ(2, callCount);
EXPECT_TRUE(barFailed); EXPECT_TRUE(barFailed);
...@@ -332,11 +332,11 @@ TEST(Capability, DynamicClientInheritance) { ...@@ -332,11 +332,11 @@ TEST(Capability, DynamicClientInheritance) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
checkDynamicTestMessage(response2.as<DynamicStruct>()); checkDynamicTestMessage(response2.as<DynamicStruct>());
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("bar", response1.get("x").as<Text>()); EXPECT_EQ("bar", response1.get("x").as<Text>());
...@@ -371,10 +371,10 @@ TEST(Capability, DynamicClientPipelining) { ...@@ -371,10 +371,10 @@ TEST(Capability, DynamicClientPipelining) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
EXPECT_EQ(0, chainedCallCount); EXPECT_EQ(0, chainedCallCount);
auto response = loop.wait(kj::mv(pipelinePromise)); auto response = pipelinePromise.wait();
EXPECT_EQ("bar", response.get("x").as<Text>()); EXPECT_EQ("bar", response.get("x").as<Text>());
auto response2 = loop.wait(kj::mv(pipelinePromise2)); auto response2 = pipelinePromise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
EXPECT_EQ(3, callCount); EXPECT_EQ(3, callCount);
...@@ -433,7 +433,7 @@ TEST(Capability, DynamicServer) { ...@@ -433,7 +433,7 @@ TEST(Capability, DynamicServer) {
bool barFailed = false; bool barFailed = false;
auto request3 = client.barRequest(); auto request3 = client.barRequest();
auto promise3 = loop.there(request3.send(), auto promise3 = request3.send().then(
[](Response<test::TestInterface::BarResults>&& response) { [](Response<test::TestInterface::BarResults>&& response) {
ADD_FAILURE() << "Expected bar() call to fail."; ADD_FAILURE() << "Expected bar() call to fail.";
}, [&](kj::Exception&& e) { }, [&](kj::Exception&& e) {
...@@ -442,13 +442,13 @@ TEST(Capability, DynamicServer) { ...@@ -442,13 +442,13 @@ TEST(Capability, DynamicServer) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("foo", response1.getX()); EXPECT_EQ("foo", response1.getX());
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
loop.wait(kj::mv(promise3)); promise3.wait();
EXPECT_EQ(2, callCount); EXPECT_EQ(2, callCount);
EXPECT_TRUE(barFailed); EXPECT_TRUE(barFailed);
...@@ -502,11 +502,11 @@ TEST(Capability, DynamicServerInheritance) { ...@@ -502,11 +502,11 @@ TEST(Capability, DynamicServerInheritance) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("bar", response1.getX()); EXPECT_EQ("bar", response1.getX());
...@@ -582,10 +582,10 @@ TEST(Capability, DynamicServerPipelining) { ...@@ -582,10 +582,10 @@ TEST(Capability, DynamicServerPipelining) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
EXPECT_EQ(0, chainedCallCount); EXPECT_EQ(0, chainedCallCount);
auto response = loop.wait(kj::mv(pipelinePromise)); auto response = pipelinePromise.wait();
EXPECT_EQ("bar", response.getX()); EXPECT_EQ("bar", response.getX());
auto response2 = loop.wait(kj::mv(pipelinePromise2)); auto response2 = pipelinePromise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
EXPECT_EQ(3, callCount); EXPECT_EQ(3, callCount);
......
...@@ -206,9 +206,8 @@ public: ...@@ -206,9 +206,8 @@ public:
auto daemonPromise = forked.addBranch(); auto daemonPromise = forked.addBranch();
daemonPromise.attach(kj::addRef(*context)); daemonPromise.attach(kj::addRef(*context));
daemonPromise.exclusiveJoin(kj::mv(cancelPaf.promise)); daemonPromise.exclusiveJoin(kj::mv(cancelPaf.promise));
// Ignore exceptions. // Daemonize, ignoring exceptions.
daemonPromise = daemonPromise.then([]() {}, [](kj::Exception&&) {}); kj::daemonize(kj::mv(daemonPromise), [](kj::Exception&&) {});
kj::EventLoop::current().daemonize(kj::mv(daemonPromise));
// Now the other branch returns the response from the context. // Now the other branch returns the response from the context.
auto contextPtr = context.get(); auto contextPtr = context.get();
......
...@@ -126,7 +126,7 @@ TEST(TwoPartyNetwork, Basic) { ...@@ -126,7 +126,7 @@ TEST(TwoPartyNetwork, Basic) {
bool barFailed = false; bool barFailed = false;
auto request3 = client.barRequest(); auto request3 = client.barRequest();
auto promise3 = loop.there(request3.send(), auto promise3 = request3.send().then(
[](Response<test::TestInterface::BarResults>&& response) { [](Response<test::TestInterface::BarResults>&& response) {
ADD_FAILURE() << "Expected bar() call to fail."; ADD_FAILURE() << "Expected bar() call to fail.";
}, [&](kj::Exception&& e) { }, [&](kj::Exception&& e) {
...@@ -135,13 +135,13 @@ TEST(TwoPartyNetwork, Basic) { ...@@ -135,13 +135,13 @@ TEST(TwoPartyNetwork, Basic) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
auto response1 = loop.wait(kj::mv(promise1)); auto response1 = promise1.wait();
EXPECT_EQ("foo", response1.getX()); EXPECT_EQ("foo", response1.getX());
auto response2 = loop.wait(kj::mv(promise2)); auto response2 = promise2.wait();
loop.wait(kj::mv(promise3)); promise3.wait();
EXPECT_EQ(2, callCount); EXPECT_EQ(2, callCount);
EXPECT_TRUE(barFailed); EXPECT_TRUE(barFailed);
...@@ -167,10 +167,8 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -167,10 +167,8 @@ TEST(TwoPartyNetwork, Pipelining) {
bool disconnected = false; bool disconnected = false;
bool drained = false; bool drained = false;
kj::Promise<void> disconnectPromise = loop.there(network.onDisconnect(), kj::Promise<void> disconnectPromise = network.onDisconnect().then([&]() { disconnected = true; });
[&]() { disconnected = true; }); kj::Promise<void> drainedPromise = network.onDrained().then([&]() { drained = true; });
kj::Promise<void> drainedPromise = loop.there(network.onDrained(),
[&]() { drained = true; });
{ {
// Request the particular capability from the server. // Request the particular capability from the server.
...@@ -198,10 +196,10 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -198,10 +196,10 @@ TEST(TwoPartyNetwork, Pipelining) {
EXPECT_EQ(0, callCount); EXPECT_EQ(0, callCount);
EXPECT_EQ(0, reverseCallCount); EXPECT_EQ(0, reverseCallCount);
auto response = loop.wait(kj::mv(pipelinePromise)); auto response = pipelinePromise.wait();
EXPECT_EQ("bar", response.getX()); EXPECT_EQ("bar", response.getX());
auto response2 = loop.wait(kj::mv(pipelinePromise2)); auto response2 = pipelinePromise2.wait();
checkTestMessage(response2); checkTestMessage(response2);
EXPECT_EQ(3, callCount); EXPECT_EQ(3, callCount);
...@@ -215,7 +213,7 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -215,7 +213,7 @@ TEST(TwoPartyNetwork, Pipelining) {
thread->sendSignal(SIGUSR2); thread->sendSignal(SIGUSR2);
thread = nullptr; thread = nullptr;
loop.wait(kj::mv(disconnectPromise)); disconnectPromise.wait();
EXPECT_FALSE(drained); EXPECT_FALSE(drained);
{ {
...@@ -234,8 +232,8 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -234,8 +232,8 @@ TEST(TwoPartyNetwork, Pipelining) {
.castAs<test::TestExtends>().graultRequest(); .castAs<test::TestExtends>().graultRequest();
auto pipelinePromise2 = pipelineRequest2.send(); auto pipelinePromise2 = pipelineRequest2.send();
EXPECT_ANY_THROW(loop.wait(kj::mv(pipelinePromise))); EXPECT_ANY_THROW(pipelinePromise.wait());
EXPECT_ANY_THROW(loop.wait(kj::mv(pipelinePromise2))); EXPECT_ANY_THROW(pipelinePromise2.wait());
EXPECT_EQ(3, callCount); EXPECT_EQ(3, callCount);
EXPECT_EQ(1, reverseCallCount); EXPECT_EQ(1, reverseCallCount);
...@@ -244,7 +242,7 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -244,7 +242,7 @@ TEST(TwoPartyNetwork, Pipelining) {
EXPECT_FALSE(drained); EXPECT_FALSE(drained);
} }
loop.wait(kj::mv(drainedPromise)); drainedPromise.wait();
} }
} // namespace } // namespace
......
...@@ -2345,6 +2345,7 @@ private: ...@@ -2345,6 +2345,7 @@ private:
answer.pipeline = kj::mv(promiseAndPipeline.pipeline); answer.pipeline = kj::mv(promiseAndPipeline.pipeline);
if (redirectResults) { if (redirectResults) {
// TODO(now): Handle exceptions, dummy.
auto resultsPromise = promiseAndPipeline.promise.then( auto resultsPromise = promiseAndPipeline.promise.then(
kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) { kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
return context->consumeRedirectedResponse(); return context->consumeRedirectedResponse();
...@@ -2359,7 +2360,7 @@ private: ...@@ -2359,7 +2360,7 @@ private:
auto promise = kj::mv(cancelPaf.promise); auto promise = kj::mv(cancelPaf.promise);
promise.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){})); promise.exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}));
kj::EventLoop::current().daemonize(kj::mv(promise)); daemonize(kj::mv(promise), [](kj::Exception&&) {});
} else { } else {
// Hack: Both the success and error continuations need to use the context. We could // Hack: Both the success and error continuations need to use the context. We could
// refcount, but both will be destroyed at the same time anyway. // refcount, but both will be destroyed at the same time anyway.
...@@ -2376,7 +2377,7 @@ private: ...@@ -2376,7 +2377,7 @@ private:
}); });
promise.attach(kj::mv(context)); promise.attach(kj::mv(context));
promise.exclusiveJoin(kj::mv(cancelPaf.promise)); promise.exclusiveJoin(kj::mv(cancelPaf.promise));
kj::EventLoop::current().daemonize(kj::mv(promise)); daemonize(kj::mv(promise), [](kj::Exception&&) {});
} }
} }
} }
......
...@@ -1073,7 +1073,7 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien ...@@ -1073,7 +1073,7 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien
ADD_FAILURE() << "Looped too long, giving up."; ADD_FAILURE() << "Looped too long, giving up.";
return kj::READY_NOW; return kj::READY_NOW;
} else { } else {
return kj::EventLoop::current().evalLater([=]() mutable { return kj::evalLater([=]() mutable {
return loop(depth + 1, cap, context); return loop(depth + 1, cap, context);
}); });
} }
...@@ -1101,7 +1101,7 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien ...@@ -1101,7 +1101,7 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien
EXPECT_EQ("foo", response.getX()); EXPECT_EQ("foo", response.getX());
}); });
} else { } else {
return kj::EventLoop::current().evalLater([=]() mutable { return kj::evalLater([=]() mutable {
return loop(depth + 1, cap, context); return loop(depth + 1, cap, context);
}); });
} }
......
...@@ -41,14 +41,16 @@ TEST(AsyncIo, SimpleNetwork) { ...@@ -41,14 +41,16 @@ TEST(AsyncIo, SimpleNetwork) {
auto port = newPromiseAndFulfiller<uint>(); auto port = newPromiseAndFulfiller<uint>();
loop.daemonize(port.promise.then([&](uint portnum) { daemonize(port.promise.then([&](uint portnum) {
return network->parseRemoteAddress("127.0.0.1", portnum); return network->parseRemoteAddress("127.0.0.1", portnum);
}).then([&](Own<RemoteAddress>&& result) { }).then([&](Own<RemoteAddress>&& result) {
return result->connect(); return result->connect();
}).then([&](Own<AsyncIoStream>&& result) { }).then([&](Own<AsyncIoStream>&& result) {
client = kj::mv(result); client = kj::mv(result);
return client->write("foo", 3); return client->write("foo", 3);
})); }), [](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
kj::String result = network->parseLocalAddress("*").then([&](Own<LocalAddress>&& result) { kj::String result = network->parseLocalAddress("*").then([&](Own<LocalAddress>&& result) {
listener = result->listen(); listener = result->listen();
...@@ -95,7 +97,9 @@ TEST(AsyncIo, OneWayPipe) { ...@@ -95,7 +97,9 @@ TEST(AsyncIo, OneWayPipe) {
auto pipe = newOneWayPipe(); auto pipe = newOneWayPipe();
char receiveBuffer[4]; char receiveBuffer[4];
loop.daemonize(pipe.out->write("foo", 3)); daemonize(pipe.out->write("foo", 3), [](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) { kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) {
EXPECT_EQ(3u, n); EXPECT_EQ(3u, n);
......
...@@ -583,13 +583,13 @@ private: ...@@ -583,13 +583,13 @@ private:
class SocketNetwork final: public Network { class SocketNetwork final: public Network {
public: public:
Promise<Own<LocalAddress>> parseLocalAddress(StringPtr addr, uint portHint = 0) const override { Promise<Own<LocalAddress>> parseLocalAddress(StringPtr addr, uint portHint = 0) const override {
return EventLoop::current().evalLater(mvCapture(heapString(addr), return evalLater(mvCapture(heapString(addr),
[portHint](String&& addr) -> Own<LocalAddress> { [portHint](String&& addr) -> Own<LocalAddress> {
return heap<LocalSocketAddress>(SocketAddress::parseLocal(addr, portHint)); return heap<LocalSocketAddress>(SocketAddress::parseLocal(addr, portHint));
})); }));
} }
Promise<Own<RemoteAddress>> parseRemoteAddress(StringPtr addr, uint portHint = 0) const override { Promise<Own<RemoteAddress>> parseRemoteAddress(StringPtr addr, uint portHint = 0) const override {
return EventLoop::current().evalLater(mvCapture(heapString(addr), return evalLater(mvCapture(heapString(addr),
[portHint](String&& addr) -> Own<RemoteAddress> { [portHint](String&& addr) -> Own<RemoteAddress> {
return heap<RemoteSocketAddress>(SocketAddress::parse(addr, portHint)); return heap<RemoteSocketAddress>(SocketAddress::parse(addr, portHint));
})); }));
......
...@@ -194,7 +194,7 @@ class IoLoopMainImpl: public IoLoopMain { ...@@ -194,7 +194,7 @@ class IoLoopMainImpl: public IoLoopMain {
public: public:
IoLoopMainImpl(Func&& func): func(kj::mv(func)) {} IoLoopMainImpl(Func&& func): func(kj::mv(func)) {}
void run(EventLoop& loop) override { void run(EventLoop& loop) override {
result = space.construct(loop.wait(loop.evalLater(func))); result = space.construct(kj::evalLater(func).wait());
} }
Result getResult() { return kj::mv(*result); } Result getResult() { return kj::mv(*result); }
...@@ -209,7 +209,7 @@ class IoLoopMainImpl<Func, void>: public IoLoopMain { ...@@ -209,7 +209,7 @@ class IoLoopMainImpl<Func, void>: public IoLoopMain {
public: public:
IoLoopMainImpl(Func&& func): func(kj::mv(func)) {} IoLoopMainImpl(Func&& func): func(kj::mv(func)) {}
void run(EventLoop& loop) override { void run(EventLoop& loop) override {
loop.wait(loop.evalLater(func)); kj::evalLater(func).wait();
} }
void getResult() {} void getResult() {}
...@@ -222,7 +222,7 @@ void runIoEventLoopInternal(IoLoopMain& func); ...@@ -222,7 +222,7 @@ void runIoEventLoopInternal(IoLoopMain& func);
} // namespace _ (private) } // namespace _ (private)
template <typename Func> template <typename Func>
auto runIoEventLoop(Func&& func) -> decltype(instance<EventLoop&>().wait(func())) { auto runIoEventLoop(Func&& func) -> decltype(func().wait()) {
// Sets up an appropriate EventLoop for doing I/O, then executes the given function. The function // Sets up an appropriate EventLoop for doing I/O, then executes the given function. The function
// returns a promise. The EventLoop will continue running until that promise resolves, then the // returns a promise. The EventLoop will continue running until that promise resolves, then the
// whole function will return its resolution. On return, the EventLoop is destroyed, cancelling // whole function will return its resolution. On return, the EventLoop is destroyed, cancelling
...@@ -235,7 +235,7 @@ auto runIoEventLoop(Func&& func) -> decltype(instance<EventLoop&>().wait(func()) ...@@ -235,7 +235,7 @@ auto runIoEventLoop(Func&& func) -> decltype(instance<EventLoop&>().wait(func())
// from the implementation details but GCC claimed the two declarations were overloads rather // from the implementation details but GCC claimed the two declarations were overloads rather
// than the same function, even though the signature was identical. FFFFFFFFFFUUUUUUUUUUUUUUU- // than the same function, even though the signature was identical. FFFFFFFFFFUUUUUUUUUUUUUUU-
typedef decltype(instance<EventLoop&>().wait(instance<Func>()())) Result; typedef decltype(instance<Func>()().wait()) Result;
_::IoLoopMainImpl<Func, Result> func2(kj::fwd<Func>(func)); _::IoLoopMainImpl<Func, Result> func2(kj::fwd<Func>(func));
_::runIoEventLoopInternal(func2); _::runIoEventLoopInternal(func2);
return func2.getResult(); return func2.getResult();
......
...@@ -238,13 +238,13 @@ TEST(Async, Ordering) { ...@@ -238,13 +238,13 @@ TEST(Async, Ordering) {
int counter = 0; int counter = 0;
Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr}; Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr};
promises[1] = loop.evalLater([&]() { promises[1] = evalLater([&]() {
EXPECT_EQ(0, counter++); EXPECT_EQ(0, counter++);
promises[2] = Promise<void>(READY_NOW).then([&]() { promises[2] = Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(1, counter++); EXPECT_EQ(1, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain. return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
}); });
promises[3] = loop.evalLater([&]() { promises[3] = evalLater([&]() {
EXPECT_EQ(4, counter++); EXPECT_EQ(4, counter++);
return Promise<void>(READY_NOW).then([&]() { return Promise<void>(READY_NOW).then([&]() {
EXPECT_EQ(5, counter++); EXPECT_EQ(5, counter++);
...@@ -254,12 +254,12 @@ TEST(Async, Ordering) { ...@@ -254,12 +254,12 @@ TEST(Async, Ordering) {
EXPECT_EQ(2, counter++); EXPECT_EQ(2, counter++);
return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain. return Promise<void>(READY_NOW); // Force proactive evaluation by faking a chain.
}); });
promises[5] = loop.evalLater([&]() { promises[5] = evalLater([&]() {
EXPECT_EQ(6, counter++); EXPECT_EQ(6, counter++);
}); });
}); });
promises[0] = loop.evalLater([&]() { promises[0] = evalLater([&]() {
EXPECT_EQ(3, counter++); EXPECT_EQ(3, counter++);
// Making this a chain should NOT cause it to preempt promises[1]. (This was a problem at one // Making this a chain should NOT cause it to preempt promises[1]. (This was a problem at one
...@@ -465,5 +465,27 @@ TEST(Async, EagerlyEvaluate) { ...@@ -465,5 +465,27 @@ TEST(Async, EagerlyEvaluate) {
EXPECT_TRUE(called); EXPECT_TRUE(called);
} }
TEST(Async, Daemonize) {
SimpleEventLoop loop;
bool ran1 = false;
bool ran2 = false;
bool ran3 = false;
evalLater([&]() { ran1 = true; });
daemonize(evalLater([&]() { ran2 = true; }), [](kj::Exception&&) { ADD_FAILURE(); });
daemonize(evalLater([]() { KJ_FAIL_ASSERT("foo"); }), [&](kj::Exception&& e) { ran3 = true; });
EXPECT_FALSE(ran1);
EXPECT_FALSE(ran2);
EXPECT_FALSE(ran3);
evalLater([]() {}).wait();
EXPECT_FALSE(ran1);
EXPECT_TRUE(ran2);
EXPECT_TRUE(ran3);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -84,9 +84,11 @@ TEST_F(AsyncUnixTest, SignalWithValue) { ...@@ -84,9 +84,11 @@ TEST_F(AsyncUnixTest, SignalWithValue) {
TEST_F(AsyncUnixTest, SignalsMultiListen) { TEST_F(AsyncUnixTest, SignalsMultiListen) {
UnixEventLoop loop; UnixEventLoop loop;
loop.daemonize(loop.onSignal(SIGIO).then([](siginfo_t&&) { daemonize(loop.onSignal(SIGIO).then([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal."; ADD_FAILURE() << "Received wrong signal.";
})); }), [](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
kill(getpid(), SIGUSR2); kill(getpid(), SIGUSR2);
...@@ -145,10 +147,12 @@ TEST_F(AsyncUnixTest, PollMultiListen) { ...@@ -145,10 +147,12 @@ TEST_F(AsyncUnixTest, PollMultiListen) {
KJ_SYSCALL(pipe(bogusPipefds)); KJ_SYSCALL(pipe(bogusPipefds));
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); }); KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
loop.daemonize(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) { daemonize(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) {
KJ_DBG(s); KJ_DBG(s);
ADD_FAILURE() << "Received wrong poll."; ADD_FAILURE() << "Received wrong poll.";
})); }), [](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
});
int pipefds[2]; int pipefds[2];
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
......
...@@ -44,13 +44,13 @@ namespace { ...@@ -44,13 +44,13 @@ namespace {
static __thread EventLoop* threadLocalEventLoop = nullptr; static __thread EventLoop* threadLocalEventLoop = nullptr;
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::EventLoop::Event*>(1) #define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1)
class BoolEvent: public EventLoop::Event { class BoolEvent: public _::Event {
public: public:
bool fired = false; bool fired = false;
Maybe<Own<Event>> fire() override { Maybe<Own<_::Event>> fire() override {
fired = true; fired = true;
return nullptr; return nullptr;
} }
...@@ -58,7 +58,7 @@ public: ...@@ -58,7 +58,7 @@ public:
class YieldPromiseNode final: public _::PromiseNode { class YieldPromiseNode final: public _::PromiseNode {
public: public:
bool onReady(EventLoop::Event& event) noexcept override { bool onReady(_::Event& event) noexcept override {
event.armBreadthFirst(); event.armBreadthFirst();
return false; return false;
} }
...@@ -86,7 +86,7 @@ public: ...@@ -86,7 +86,7 @@ public:
} }
} }
class Task final: public EventLoop::Event { class Task final: public Event {
public: public:
Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam) Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
: taskSet(taskSet), node(kj::mv(nodeParam)) { : taskSet(taskSet), node(kj::mv(nodeParam)) {
...@@ -199,9 +199,9 @@ EventLoop::~EventLoop() noexcept(false) { ...@@ -199,9 +199,9 @@ EventLoop::~EventLoop() noexcept(false) {
KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue. Memory leak?", KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue. Memory leak?",
head->trace()) { head->trace()) {
// Unlink all the events and hope that no one ever fires them... // Unlink all the events and hope that no one ever fires them...
Event* event = head; _::Event* event = head;
while (event != nullptr) { while (event != nullptr) {
Event* next = event->next; _::Event* next = event->next;
event->next = nullptr; event->next = nullptr;
event->prev = nullptr; event->prev = nullptr;
event = next; event = next;
...@@ -232,7 +232,7 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result ...@@ -232,7 +232,7 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result
} }
sleep(); sleep();
} else { } else {
Event* event = head; _::Event* event = head;
head = event->next; head = event->next;
depthFirstInsertPoint = &head; depthFirstInsertPoint = &head;
...@@ -243,7 +243,7 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result ...@@ -243,7 +243,7 @@ void EventLoop::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result
event->next = nullptr; event->next = nullptr;
event->prev = nullptr; event->prev = nullptr;
Maybe<Own<Event>> eventToDestroy; Maybe<Own<_::Event>> eventToDestroy;
{ {
event->firing = true; event->firing = true;
KJ_DEFER(event->firing = false); KJ_DEFER(event->firing = false);
...@@ -271,10 +271,12 @@ void EventLoop::daemonize(kj::Promise<void>&& promise) { ...@@ -271,10 +271,12 @@ void EventLoop::daemonize(kj::Promise<void>&& promise) {
daemons->add(kj::mv(promise)); daemons->add(kj::mv(promise));
} }
EventLoop::Event::Event() namespace _ { // private
Event::Event()
: loop(EventLoop::current()), next(nullptr), prev(nullptr) {} : loop(EventLoop::current()), next(nullptr), prev(nullptr) {}
EventLoop::Event::~Event() noexcept(false) { Event::~Event() noexcept(false) {
if (prev != nullptr) { if (prev != nullptr) {
if (loop.head == this) { if (loop.head == this) {
loop.head = next; loop.head = next;
...@@ -297,7 +299,7 @@ EventLoop::Event::~Event() noexcept(false) { ...@@ -297,7 +299,7 @@ EventLoop::Event::~Event() noexcept(false) {
"Promise destroyed from a different thread than it was created in."); "Promise destroyed from a different thread than it was created in.");
} }
void EventLoop::Event::armDepthFirst() { void Event::armDepthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use " "Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread."); "the thread-safe work queue to queue events cross-thread.");
...@@ -318,7 +320,7 @@ void EventLoop::Event::armDepthFirst() { ...@@ -318,7 +320,7 @@ void EventLoop::Event::armDepthFirst() {
} }
} }
void EventLoop::Event::armBreadthFirst() { void Event::armBreadthFirst() {
KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr, KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
"Event armed from different thread than it was created in. You must use " "Event armed from different thread than it was created in. You must use "
"the thread-safe work queue to queue events cross-thread."); "the thread-safe work queue to queue events cross-thread.");
...@@ -335,7 +337,7 @@ void EventLoop::Event::armBreadthFirst() { ...@@ -335,7 +337,7 @@ void EventLoop::Event::armBreadthFirst() {
} }
} }
_::PromiseNode* EventLoop::Event::getInnerForTrace() { _::PromiseNode* Event::getInnerForTrace() {
return nullptr; return nullptr;
} }
...@@ -353,7 +355,7 @@ static kj::String demangleTypeName(const char* name) { ...@@ -353,7 +355,7 @@ static kj::String demangleTypeName(const char* name) {
} }
#endif #endif
static kj::String traceImpl(EventLoop::Event* event, _::PromiseNode* node) { static kj::String traceImpl(Event* event, _::PromiseNode* node) {
kj::Vector<kj::String> trace; kj::Vector<kj::String> trace;
if (event != nullptr) { if (event != nullptr) {
...@@ -368,10 +370,12 @@ static kj::String traceImpl(EventLoop::Event* event, _::PromiseNode* node) { ...@@ -368,10 +370,12 @@ static kj::String traceImpl(EventLoop::Event* event, _::PromiseNode* node) {
return strArray(trace, "\n"); return strArray(trace, "\n");
} }
kj::String EventLoop::Event::trace() { kj::String Event::trace() {
return traceImpl(this, getInnerForTrace()); return traceImpl(this, getInnerForTrace());
} }
} // namespace _ (private)
// ======================================================================================= // =======================================================================================
#if KJ_USE_FUTEX #if KJ_USE_FUTEX
...@@ -473,7 +477,7 @@ namespace _ { // private ...@@ -473,7 +477,7 @@ namespace _ { // private
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; } PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
bool PromiseNode::OnReadyEvent::init(EventLoop::Event& newEvent) { bool PromiseNode::OnReadyEvent::init(Event& newEvent) {
if (event == _kJ_ALREADY_READY) { if (event == _kJ_ALREADY_READY) {
return true; return true;
} else { } else {
...@@ -492,7 +496,7 @@ void PromiseNode::OnReadyEvent::arm() { ...@@ -492,7 +496,7 @@ void PromiseNode::OnReadyEvent::arm() {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
bool ImmediatePromiseNodeBase::onReady(EventLoop::Event& event) noexcept { return true; } bool ImmediatePromiseNodeBase::onReady(Event& event) noexcept { return true; }
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception) ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
: exception(kj::mv(exception)) {} : exception(kj::mv(exception)) {}
...@@ -506,7 +510,7 @@ void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept { ...@@ -506,7 +510,7 @@ void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency) AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency)
: dependency(kj::mv(dependency)) {} : dependency(kj::mv(dependency)) {}
bool AttachmentPromiseNodeBase::onReady(EventLoop::Event& event) noexcept { bool AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
return dependency->onReady(event); return dependency->onReady(event);
} }
...@@ -527,7 +531,7 @@ void AttachmentPromiseNodeBase::dropDependency() { ...@@ -527,7 +531,7 @@ void AttachmentPromiseNodeBase::dropDependency() {
TransformPromiseNodeBase::TransformPromiseNodeBase(Own<PromiseNode>&& dependency) TransformPromiseNodeBase::TransformPromiseNodeBase(Own<PromiseNode>&& dependency)
: dependency(kj::mv(dependency)) {} : dependency(kj::mv(dependency)) {}
bool TransformPromiseNodeBase::onReady(EventLoop::Event& event) noexcept { bool TransformPromiseNodeBase::onReady(Event& event) noexcept {
return dependency->onReady(event); return dependency->onReady(event);
} }
...@@ -591,7 +595,7 @@ void ForkBranchBase::releaseHub(ExceptionOrValue& output) { ...@@ -591,7 +595,7 @@ void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
} }
} }
bool ForkBranchBase::onReady(EventLoop::Event& event) noexcept { bool ForkBranchBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event); return onReadyEvent.init(event);
} }
...@@ -606,7 +610,7 @@ ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& result ...@@ -606,7 +610,7 @@ ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& result
if (inner->onReady(*this)) armDepthFirst(); if (inner->onReady(*this)) armDepthFirst();
} }
Maybe<Own<EventLoop::Event>> ForkHubBase::fire() { Maybe<Own<Event>> ForkHubBase::fire() {
// Dependency is ready. Fetch its result and then delete the node. // Dependency is ready. Fetch its result and then delete the node.
inner->get(resultRef); inner->get(resultRef);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
...@@ -641,7 +645,7 @@ ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam) ...@@ -641,7 +645,7 @@ ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {} ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
bool ChainPromiseNode::onReady(EventLoop::Event& event) noexcept { bool ChainPromiseNode::onReady(Event& event) noexcept {
switch (state) { switch (state) {
case STEP1: case STEP1:
KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once."); KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
...@@ -662,7 +666,7 @@ PromiseNode* ChainPromiseNode::getInnerForTrace() { ...@@ -662,7 +666,7 @@ PromiseNode* ChainPromiseNode::getInnerForTrace() {
return inner; return inner;
} }
Maybe<Own<EventLoop::Event>> ChainPromiseNode::fire() { Maybe<Own<Event>> ChainPromiseNode::fire() {
KJ_REQUIRE(state != STEP2); KJ_REQUIRE(state != STEP2);
static_assert(sizeof(Promise<int>) == sizeof(PromiseBase), static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
...@@ -709,7 +713,7 @@ ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<Pr ...@@ -709,7 +713,7 @@ ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<Pr
ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {} ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
bool ExclusiveJoinPromiseNode::onReady(EventLoop::Event& event) noexcept { bool ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
return onReadyEvent.init(event); return onReadyEvent.init(event);
} }
...@@ -742,7 +746,7 @@ bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) { ...@@ -742,7 +746,7 @@ bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) {
} }
} }
Maybe<Own<EventLoop::Event>> ExclusiveJoinPromiseNode::Branch::fire() { Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() {
// Cancel the branch that didn't return first. Ignore exceptions caused by cancellation. // Cancel the branch that didn't return first. Ignore exceptions caused by cancellation.
if (this == &joinNode.left) { if (this == &joinNode.left) {
kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; }); kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; });
...@@ -766,7 +770,7 @@ EagerPromiseNodeBase::EagerPromiseNodeBase( ...@@ -766,7 +770,7 @@ EagerPromiseNodeBase::EagerPromiseNodeBase(
if (dependency->onReady(*this)) armDepthFirst(); if (dependency->onReady(*this)) armDepthFirst();
} }
bool EagerPromiseNodeBase::onReady(EventLoop::Event& event) noexcept { bool EagerPromiseNodeBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event); return onReadyEvent.init(event);
} }
...@@ -774,7 +778,7 @@ PromiseNode* EagerPromiseNodeBase::getInnerForTrace() { ...@@ -774,7 +778,7 @@ PromiseNode* EagerPromiseNodeBase::getInnerForTrace() {
return dependency; return dependency;
} }
Maybe<Own<EventLoop::Event>> EagerPromiseNodeBase::fire() { Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
dependency->get(resultRef); dependency->get(resultRef);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() { KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
dependency = nullptr; dependency = nullptr;
...@@ -788,7 +792,7 @@ Maybe<Own<EventLoop::Event>> EagerPromiseNodeBase::fire() { ...@@ -788,7 +792,7 @@ Maybe<Own<EventLoop::Event>> EagerPromiseNodeBase::fire() {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
bool AdapterPromiseNodeBase::onReady(EventLoop::Event& event) noexcept { bool AdapterPromiseNodeBase::onReady(Event& event) noexcept {
return onReadyEvent.init(event); return onReadyEvent.init(event);
} }
......
...@@ -167,6 +167,8 @@ class ForkHub; ...@@ -167,6 +167,8 @@ class ForkHub;
class TaskSetImpl; class TaskSetImpl;
class Event;
} // namespace _ (private) } // namespace _ (private)
// ======================================================================================= // =======================================================================================
...@@ -218,127 +220,10 @@ public: ...@@ -218,127 +220,10 @@ public:
// Get the event loop for the current thread. Throws an exception if no event loop is active. // Get the event loop for the current thread. Throws an exception if no event loop is active.
bool isCurrent() const; bool isCurrent() const;
// Is this EventLoop the current one for this thread? // Is this EventLoop the current one for this thread? This can safely be called from any thread.
template <typename T>
T wait(Promise<T>&& promise);
// Run the event loop until the promise is fulfilled, then return its result. If the promise
// is rejected, throw an exception.
//
// wait() cannot be called recursively -- that is, an event callback cannot call wait().
// Instead, callbacks that need to perform more async operations should return a promise and
// rely on promise chaining.
//
// wait() is primarily useful at the top level of a program -- typically, within the function
// that allocated the EventLoop. For example, a program that performs one or two RPCs and then
// exits would likely use wait() in its main() function to wait on each RPC. On the other hand,
// server-side code generally cannot use wait(), because it has to be able to accept multiple
// requests at once.
//
// If the promise is rejected, `wait()` throws an exception. This exception is usually fatal,
// so if compiled with -fno-exceptions, the process will abort. You may work around this by
// using `there()` with an error handler to handle this case. If your error handler throws a
// non-fatal exception and then recovers by returning a dummy value, wait() will also throw a
// non-fatal exception and return the same dummy value.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
template <typename Func> void runForever() KJ_NORETURN;
PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT; // Runs the loop forever. Useful for servers.
// Schedule for the given zero-parameter function to be executed in the event loop at some
// point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
// a promise, `evalLater()` returns a Promise for the result of resolving that promise.
//
// Example usage:
// Promise<int> x = loop.evalLater([]() { return 123; });
//
// If the returned promise is destroyed before the callback runs, the callback will be canceled.
// If the returned promise is destroyed while the callback is running in another thread, the
// destructor will block until the callback completes.
//
// If you schedule several evaluations with `evalLater`, they will be executed in order.
//
// `evalLater()` is equivalent to `there()` chained on `Promise<void>(READY_NOW)`.
template <typename T, typename Func, typename ErrorFunc = _::PropagateException>
PromiseForResult<Func, T> there(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler = _::PropagateException())
KJ_WARN_UNUSED_RESULT;
// Like `Promise::then()`, but schedules the continuation to be executed on *this* EventLoop
// rather than the thread's current loop. See Promise::then().
template <typename T>
ForkedPromise<T> fork(Promise<T>&& promise);
// Like `Promise::fork()`, but manages the fork on *this* EventLoop rather than the thread's
// current loop. See Promise::fork().
template <typename T>
Promise<T> exclusiveJoin(Promise<T>&& promise1, Promise<T>&& promise2);
// Like `promise1.exclusiveJoin(promise2)`, returning the joined promise.
void daemonize(kj::Promise<void>&& promise);
// Allows the given promise to continue running in the background until it completes or the
// `EventLoop` is destroyed. Be careful when using this: you need to make sure that the promise
// owns all the objects it touches or make sure those objects outlive the EventLoop. Also, be
// careful about error handling: exceptions will merely be logged with KJ_LOG(ERROR, ...).
//
// This method exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
// canceled unless the callee explicitly permits it.
// -----------------------------------------------------------------
// Low-level interface.
class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this
// internally.
//
// WARNING: This class is difficult to use correctly. It's easy to have subtle race
// conditions.
public:
Event();
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void armDepthFirst();
// Enqueue this event so that `fire()` will be called from the event loop soon.
//
// Events scheduled in this way are executed in depth-first order: if an event callback arms
// more events, those events are placed at the front of the queue (in the order in which they
// were armed), so that they run immediately after the first event's callback returns.
//
// Depth-first event scheduling is appropriate for events that represent simple continuations
// of a previous event that should be globbed together for performance. Depth-first scheduling
// can lead to starvation, so any long-running task must occasionally yield with
// `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
// breadth-first.)
//
// To use breadth-first scheduling instead, use `armLater()`.
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
kj::String trace();
// Dump debug info about this event.
protected:
virtual Maybe<Own<Event>> fire() = 0;
// Fire the event. Possibly returns a pointer to itself, which will be discarded by the
// caller. This is the only way that an event can delete itself as a result of firing, as
// doing so from within fire() will throw an exception.
virtual _::PromiseNode* getInnerForTrace();
// If this event wraps a PromiseNode, get that node. Used for debug tracing.
// Default implementation returns nullptr.
private:
friend class EventLoop;
EventLoop& loop;
Event* next;
Event** prev;
bool firing = false;
};
protected: protected:
// ----------------------------------------------------------------- // -----------------------------------------------------------------
...@@ -363,15 +248,14 @@ private: ...@@ -363,15 +248,14 @@ private:
bool running = false; bool running = false;
// True while looping -- wait() is then not allowed. // True while looping -- wait() is then not allowed.
Event* head = nullptr; _::Event* head = nullptr;
Event** tail = &head; _::Event** tail = &head;
Event** depthFirstInsertPoint = &head; _::Event** depthFirstInsertPoint = &head;
Own<_::TaskSetImpl> daemons; Own<_::TaskSetImpl> daemons;
template <typename T, typename Func, typename ErrorFunc> template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler); Own<_::PromiseNode> thenImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler);
// Shared implementation of there() and Promise::then().
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result); void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result);
// Run the event loop until `node` is fulfilled, and then `get()` its result into `result`. // Run the event loop until `node` is fulfilled, and then `get()` its result into `result`.
...@@ -380,9 +264,22 @@ private: ...@@ -380,9 +264,22 @@ private:
// Returns a promise that won't resolve until all events currently on the queue are fired. // Returns a promise that won't resolve until all events currently on the queue are fired.
// Otherwise, returns an already-resolved promise. Used to implement evalLater(). // Otherwise, returns an already-resolved promise. Used to implement evalLater().
template <typename T>
T wait(Promise<T>&& promise);
template <typename Func>
PromiseForResult<Func, void> evalLater(Func&& func) KJ_WARN_UNUSED_RESULT;
void daemonize(kj::Promise<void>&& promise);
template <typename> template <typename>
friend class Promise; friend class Promise;
friend Promise<void> yield(); friend Promise<void> yield();
template <typename ErrorFunc>
friend void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler);
template <typename Func>
friend PromiseForResult<Func, void> evalLater(Func&& func);
friend class _::Event;
}; };
// ------------------------------------------------------------------- // -------------------------------------------------------------------
...@@ -594,11 +491,26 @@ public: ...@@ -594,11 +491,26 @@ public:
// cross-thread, both of the "optimizations" described above are avoided. // cross-thread, both of the "optimizations" described above are avoided.
T wait(); T wait();
// Equivalent to `EventLoop::current().wait(kj::mv(*this))`. // Run the event loop until the promise is fulfilled, then return its result. If the promise
// is rejected, throw an exception.
// //
// Note that `wait()` consumes the promise on which it is called, in the sense of move semantics. // wait() cannot be called recursively -- that is, an event callback cannot call wait().
// After returning, the promise is no longer valid, and cannot be `wait()`ed on or `then()`ed // Instead, callbacks that need to perform more async operations should return a promise and
// again. // rely on promise chaining.
//
// wait() is primarily useful at the top level of a program -- typically, within the function
// that allocated the EventLoop. For example, a program that performs one or two RPCs and then
// exits would likely use wait() in its main() function to wait on each RPC. On the other hand,
// server-side code generally cannot use wait(), because it has to be able to accept multiple
// requests at once.
//
// If the promise is rejected, `wait()` throws an exception. If the program was compiled without
// exceptions (-fno-exceptions), this will usually abort. In this case you really should first
// use `then()` to set an appropriate handler for the exception case, so that the promise you
// actually wait on never throws.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
ForkedPromise<T> fork(); ForkedPromise<T> fork();
// Forks the promise, so that multiple different clients can independently wait on the result. // Forks the promise, so that multiple different clients can independently wait on the result.
...@@ -698,9 +610,31 @@ constexpr _::Void READY_NOW = _::Void(); ...@@ -698,9 +610,31 @@ constexpr _::Void READY_NOW = _::Void();
// cast to `Promise<void>`. // cast to `Promise<void>`.
template <typename Func> template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) { PromiseForResult<Func, void> evalLater(Func&& func);
return EventLoop::current().evalLater(kj::fwd<Func>(func)); // Schedule for the given zero-parameter function to be executed in the event loop at some
} // point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
// a promise, `evalLater()` returns a Promise for the result of resolving that promise.
//
// Example usage:
// Promise<int> x = evalLater([]() { return 123; });
//
// If the returned promise is destroyed before the callback runs, the callback will be canceled
// (never called).
//
// If you schedule several evaluations with `evalLater`, they will be executed in order.
template <typename ErrorFunc>
void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler);
// Allows the given promise to continue running in the background until it completes or the
// `EventLoop` is destroyed. Be careful when using this: since you can no longer cancel this
// promise, you need to make sure that the promise owns all the objects it touches or make sure
// those objects outlive the EventLoop.
//
// `errorHandler` is a function that takes `kj::Exception&&`, like the second parameter to `then()`,
// except that it must return void.
//
// This function exists mainly to implement the Cap'n Proto requirement that RPC calls cannot be
// canceled unless the callee explicitly permits it.
// ------------------------------------------------------------------- // -------------------------------------------------------------------
// Hack for creating a lambda that holds an owned pointer. // Hack for creating a lambda that holds an owned pointer.
...@@ -870,6 +804,54 @@ public: ...@@ -870,6 +804,54 @@ public:
Maybe<T> value; Maybe<T> value;
}; };
class Event {
// An event waiting to be executed. Not for direct use by applications -- promises use this
// internally.
public:
Event();
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void armDepthFirst();
// Enqueue this event so that `fire()` will be called from the event loop soon.
//
// Events scheduled in this way are executed in depth-first order: if an event callback arms
// more events, those events are placed at the front of the queue (in the order in which they
// were armed), so that they run immediately after the first event's callback returns.
//
// Depth-first event scheduling is appropriate for events that represent simple continuations
// of a previous event that should be globbed together for performance. Depth-first scheduling
// can lead to starvation, so any long-running task must occasionally yield with
// `armBreadthFirst()`. (Promise::then() uses depth-first whereas evalLater() uses
// breadth-first.)
//
// To use breadth-first scheduling instead, use `armLater()`.
void armBreadthFirst();
// Like `armDepthFirst()` except that the event is placed at the end of the queue.
kj::String trace();
// Dump debug info about this event.
virtual _::PromiseNode* getInnerForTrace();
// If this event wraps a PromiseNode, get that node. Used for debug tracing.
// Default implementation returns nullptr.
protected:
virtual Maybe<Own<Event>> fire() = 0;
// Fire the event. Possibly returns a pointer to itself, which will be discarded by the
// caller. This is the only way that an event can delete itself as a result of firing, as
// doing so from within fire() will throw an exception.
private:
friend class kj::EventLoop;
EventLoop& loop;
Event* next;
Event** prev;
bool firing = false;
};
class PromiseNode { class PromiseNode {
// A Promise<T> contains a chain of PromiseNodes tracking the pending transformations. // A Promise<T> contains a chain of PromiseNodes tracking the pending transformations.
// //
...@@ -879,7 +861,7 @@ class PromiseNode { ...@@ -879,7 +861,7 @@ class PromiseNode {
// internal implementation details. // internal implementation details.
public: public:
virtual bool onReady(EventLoop::Event& event) noexcept = 0; virtual bool onReady(Event& event) noexcept = 0;
// Returns true if already ready, otherwise arms the given event when ready. // Returns true if already ready, otherwise arms the given event when ready.
virtual void get(ExceptionOrValue& output) noexcept = 0; virtual void get(ExceptionOrValue& output) noexcept = 0;
...@@ -896,7 +878,7 @@ protected: ...@@ -896,7 +878,7 @@ protected:
// Helper class for implementing onReady(). // Helper class for implementing onReady().
public: public:
bool init(EventLoop::Event& newEvent); bool init(Event& newEvent);
// Returns true if arm() was already called. // Returns true if arm() was already called.
void arm(); void arm();
...@@ -904,7 +886,7 @@ protected: ...@@ -904,7 +886,7 @@ protected:
// true. // true.
private: private:
EventLoop::Event* event = nullptr; Event* event = nullptr;
}; };
}; };
...@@ -912,7 +894,7 @@ protected: ...@@ -912,7 +894,7 @@ protected:
class ImmediatePromiseNodeBase: public PromiseNode { class ImmediatePromiseNodeBase: public PromiseNode {
public: public:
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
}; };
template <typename T> template <typename T>
...@@ -946,7 +928,7 @@ class AttachmentPromiseNodeBase: public PromiseNode { ...@@ -946,7 +928,7 @@ class AttachmentPromiseNodeBase: public PromiseNode {
public: public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency); AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override; void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
...@@ -985,7 +967,7 @@ class TransformPromiseNodeBase: public PromiseNode { ...@@ -985,7 +967,7 @@ class TransformPromiseNodeBase: public PromiseNode {
public: public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency); TransformPromiseNodeBase(Own<PromiseNode>&& dependency);
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override; void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
...@@ -1055,7 +1037,7 @@ public: ...@@ -1055,7 +1037,7 @@ public:
// Called by the hub to indicate that it is ready. // Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------ // implements PromiseNode ------------------------------------------
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
protected: protected:
...@@ -1099,7 +1081,7 @@ public: ...@@ -1099,7 +1081,7 @@ public:
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class ForkHubBase: public Refcounted, protected EventLoop::Event { class ForkHubBase: public Refcounted, protected Event {
public: public:
ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef); ForkHubBase(Own<PromiseNode>&& inner, ExceptionOrValue& resultRef);
...@@ -1142,12 +1124,12 @@ inline ExceptionOrValue& ForkBranchBase::getHubResultRef() { ...@@ -1142,12 +1124,12 @@ inline ExceptionOrValue& ForkBranchBase::getHubResultRef() {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class ChainPromiseNode final: public PromiseNode, private EventLoop::Event { class ChainPromiseNode final: public PromiseNode, private Event {
public: public:
explicit ChainPromiseNode(Own<PromiseNode> inner); explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false); ~ChainPromiseNode() noexcept(false);
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override; void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
...@@ -1185,12 +1167,12 @@ public: ...@@ -1185,12 +1167,12 @@ public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right); ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false); ~ExclusiveJoinPromiseNode() noexcept(false);
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
void get(ExceptionOrValue& output) noexcept override; void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
private: private:
class Branch: public EventLoop::Event { class Branch: public Event {
public: public:
Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency); Branch(ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependency);
~Branch() noexcept(false); ~Branch() noexcept(false);
...@@ -1213,14 +1195,14 @@ private: ...@@ -1213,14 +1195,14 @@ private:
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected EventLoop::Event { class EagerPromiseNodeBase: public PromiseNode, protected Event {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
// evaluate it. // evaluate it.
public: public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef); EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
PromiseNode* getInnerForTrace() override; PromiseNode* getInnerForTrace() override;
private: private:
...@@ -1257,7 +1239,7 @@ Own<PromiseNode> spark(Own<PromiseNode>&& node) { ...@@ -1257,7 +1239,7 @@ Own<PromiseNode> spark(Own<PromiseNode>&& node) {
class AdapterPromiseNodeBase: public PromiseNode { class AdapterPromiseNodeBase: public PromiseNode {
public: public:
bool onReady(EventLoop::Event& event) noexcept override; bool onReady(Event& event) noexcept override;
protected: protected:
inline void setReady() { inline void setReady() {
...@@ -1334,22 +1316,15 @@ T EventLoop::wait(Promise<T>&& promise) { ...@@ -1334,22 +1316,15 @@ T EventLoop::wait(Promise<T>&& promise) {
template <typename Func> template <typename Func>
PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) { PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) {
// Invoke thereImpl() on yield(). Always spark the result. // Invoke thenImpl() on yield(). Always spark the result.
return PromiseForResult<Func, void>(false, return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>( _::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
thereImpl(yield(), kj::fwd<Func>(func), _::PropagateException()))); thenImpl(yield(), kj::fwd<Func>(func), _::PropagateException())));
}
template <typename T, typename Func, typename ErrorFunc>
PromiseForResult<Func, T> EventLoop::there(
Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) {
return PromiseForResult<Func, T>(false, thereImpl(
kj::mv(promise), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)));
} }
template <typename T, typename Func, typename ErrorFunc> template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> EventLoop::thereImpl(Promise<T>&& promise, Func&& func, Own<_::PromiseNode> EventLoop::thenImpl(Promise<T>&& promise, Func&& func,
ErrorFunc&& errorHandler) { ErrorFunc&& errorHandler) {
typedef _::FixVoid<_::ReturnType<Func, T>> ResultT; typedef _::FixVoid<_::ReturnType<Func, T>> ResultT;
Own<_::PromiseNode> intermediate = Own<_::PromiseNode> intermediate =
...@@ -1369,7 +1344,7 @@ Promise<T>::Promise(kj::Exception&& exception) ...@@ -1369,7 +1344,7 @@ Promise<T>::Promise(kj::Exception&& exception)
template <typename T> template <typename T>
template <typename Func, typename ErrorFunc> template <typename Func, typename ErrorFunc>
PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) { PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler) {
return PromiseForResult<Func, T>(false, EventLoop::current().thereImpl( return PromiseForResult<Func, T>(false, EventLoop::current().thenImpl(
kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler))); kj::mv(*this), kj::fwd<Func>(func), kj::fwd<ErrorFunc>(errorHandler)));
} }
...@@ -1383,12 +1358,6 @@ ForkedPromise<T> Promise<T>::fork() { ...@@ -1383,12 +1358,6 @@ ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))); return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
} }
template <typename T>
ForkedPromise<T> EventLoop::fork(Promise<T>&& promise) {
return ForkedPromise<T>(false,
refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(promise.node)));
}
template <typename T> template <typename T>
Promise<T> ForkedPromise<T>::addBranch() { Promise<T> ForkedPromise<T>::addBranch() {
return hub->addBranch(); return hub->addBranch();
...@@ -1399,12 +1368,6 @@ void Promise<T>::exclusiveJoin(Promise<T>&& other) { ...@@ -1399,12 +1368,6 @@ void Promise<T>::exclusiveJoin(Promise<T>&& other) {
node = heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node)); node = heap<_::ExclusiveJoinPromiseNode>(kj::mv(node), kj::mv(other.node));
} }
template <typename T>
Promise<T> EventLoop::exclusiveJoin(Promise<T>&& promise1, Promise<T>&& promise2) {
return Promise<T>(false, heap<_::ExclusiveJoinPromiseNode>(
kj::mv(promise1.node), kj::mv(promise2.node)));
}
template <typename T> template <typename T>
template <typename... Attachments> template <typename... Attachments>
void Promise<T>::attach(Attachments&&... attachments) { void Promise<T>::attach(Attachments&&... attachments) {
...@@ -1417,6 +1380,16 @@ void Promise<T>::eagerlyEvaluate() { ...@@ -1417,6 +1380,16 @@ void Promise<T>::eagerlyEvaluate() {
node = _::spark<_::FixVoid<T>>(kj::mv(node)); node = _::spark<_::FixVoid<T>>(kj::mv(node));
} }
template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) {
return EventLoop::current().evalLater(kj::fwd<Func>(func));
}
template <typename ErrorFunc>
void daemonize(kj::Promise<void>&& promise, ErrorFunc&& errorHandler) {
return EventLoop::current().daemonize(promise.then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
}
// ======================================================================================= // =======================================================================================
namespace _ { // private namespace _ { // private
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment