Commit 64007d11 authored by Kenton Varda's avatar Kenton Varda

Make EventLoop current when constructed, so that you can use the Promise methods…

Make EventLoop current when constructed, so that you can use the Promise methods even when the loop hasn't actually started yet.
parent 98d72869
......@@ -29,17 +29,8 @@
namespace kj {
namespace {
class DummyErrorHandler: public TaskSet::ErrorHandler {
public:
void taskFailed(kj::Exception&& exception) override {
kj::throwRecoverableException(kj::mv(exception));
}
};
TEST(AsyncIo, SimpleNetwork) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler);
auto network = Network::newSystemNetwork();
Own<ConnectionReceiver> listener;
......@@ -50,46 +41,36 @@ TEST(AsyncIo, SimpleNetwork) {
auto port = newPromiseAndFulfiller<uint>();
tasks.add(loop.evalLater([&]() {
return port.promise
.then([&](uint portnum) {
return network->parseRemoteAddress("127.0.0.1", portnum);
}).then([&](Own<RemoteAddress>&& result) {
return result->connect();
}).then([&](Own<AsyncIoStream>&& result) {
client = kj::mv(result);
return client->write("foo", 3);
});
loop.daemonize(port.promise.then([&](uint portnum) {
return network->parseRemoteAddress("127.0.0.1", portnum);
}).then([&](Own<RemoteAddress>&& result) {
return result->connect();
}).then([&](Own<AsyncIoStream>&& result) {
client = kj::mv(result);
return client->write("foo", 3);
}));
kj::String result = loop.wait(loop.evalLater([&]() {
return network->parseLocalAddress("*")
.then([&](Own<LocalAddress>&& result) {
listener = result->listen();
port.fulfiller->fulfill(listener->getPort());
return listener->accept();
}).then([&](Own<AsyncIoStream>&& result) {
server = kj::mv(result);
return server->tryRead(receiveBuffer, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
});
}));
kj::String result = network->parseLocalAddress("*").then([&](Own<LocalAddress>&& result) {
listener = result->listen();
port.fulfiller->fulfill(listener->getPort());
return listener->accept();
}).then([&](Own<AsyncIoStream>&& result) {
server = kj::mv(result);
return server->tryRead(receiveBuffer, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
}).wait();
EXPECT_EQ("foo", result);
}
String tryParseLocal(EventLoop& loop, Network& network, StringPtr text, uint portHint = 0) {
return loop.wait(loop.evalLater([&]() {
return network.parseLocalAddress(text, portHint);
}))->toString();
return network.parseLocalAddress(text, portHint).wait()->toString();
}
String tryParseRemote(EventLoop& loop, Network& network, StringPtr text, uint portHint = 0) {
return loop.wait(loop.evalLater([&]() {
return network.parseRemoteAddress(text, portHint);
}))->toString();
return network.parseRemoteAddress(text, portHint).wait()->toString();
}
TEST(AsyncIo, AddressParsing) {
......@@ -110,56 +91,42 @@ TEST(AsyncIo, AddressParsing) {
TEST(AsyncIo, OneWayPipe) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler);
auto pipe = newOneWayPipe();
char receiveBuffer[4];
tasks.add(loop.evalLater([&]() {
return pipe.out->write("foo", 3);
}));
loop.daemonize(pipe.out->write("foo", 3));
kj::String result = loop.wait(loop.evalLater([&]() {
return pipe.in->tryRead(receiveBuffer, 3, 4)
.then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
});
}));
kj::String result = pipe.in->tryRead(receiveBuffer, 3, 4).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
}).wait();
EXPECT_EQ("foo", result);
}
TEST(AsyncIo, TwoWayPipe) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
auto pipe = newTwoWayPipe();
char receiveBuffer1[4];
char receiveBuffer2[4];
auto promise = loop.evalLater([&]() {
return pipe.ends[0]->write("foo", 3)
.then([&]() {
return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer1, n);
});
auto promise = pipe.ends[0]->write("foo", 3).then([&]() {
return pipe.ends[0]->tryRead(receiveBuffer1, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer1, n);
});
kj::String result = loop.wait(loop.evalLater([&]() {
return pipe.ends[1]->write("bar", 3)
.then([&]() {
return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer2, n);
});
}));
kj::String result = pipe.ends[1]->write("bar", 3).then([&]() {
return pipe.ends[1]->tryRead(receiveBuffer2, 3, 4);
}).then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer2, n);
}).wait();
kj::String result2 = loop.wait(kj::mv(promise));
kj::String result2 = promise.wait();
EXPECT_EQ("foo", result);
EXPECT_EQ("bar", result2);
......
......@@ -25,6 +25,7 @@
#include "mutex.h"
#include "debug.h"
#include "thread.h"
#include <sched.h>
#include <gtest/gtest.h>
namespace kj {
......@@ -35,9 +36,9 @@ TEST(Async, EvalVoid) {
bool done = false;
Promise<void> promise = loop.evalLater([&]() { done = true; });
Promise<void> promise = evalLater([&]() { done = true; });
EXPECT_FALSE(done);
loop.wait(kj::mv(promise));
promise.wait();
EXPECT_TRUE(done);
}
......@@ -46,9 +47,9 @@ TEST(Async, EvalInt) {
bool done = false;
Promise<int> promise = loop.evalLater([&]() { done = true; return 123; });
Promise<int> promise = evalLater([&]() { done = true; return 123; });
EXPECT_FALSE(done);
EXPECT_EQ(123, loop.wait(kj::mv(promise)));
EXPECT_EQ(123, promise.wait());
EXPECT_TRUE(done);
}
......@@ -58,9 +59,9 @@ TEST(Async, There) {
Promise<int> a = 123;
bool done = false;
Promise<int> promise = loop.there(kj::mv(a), [&](int ai) { done = true; return ai + 321; });
Promise<int> promise = a.then([&](int ai) { done = true; return ai + 321; });
EXPECT_FALSE(done);
EXPECT_EQ(444, loop.wait(kj::mv(promise)));
EXPECT_EQ(444, promise.wait());
EXPECT_TRUE(done);
}
......@@ -70,90 +71,85 @@ TEST(Async, ThereVoid) {
Promise<int> a = 123;
int value = 0;
Promise<void> promise = loop.there(kj::mv(a), [&](int ai) { value = ai; });
Promise<void> promise = a.then([&](int ai) { value = ai; });
EXPECT_EQ(0, value);
loop.wait(kj::mv(promise));
promise.wait();
EXPECT_EQ(123, value);
}
TEST(Async, Exception) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
Promise<int> promise = evalLater(
[&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
EXPECT_TRUE(kj::runCatchingExceptions([&]() {
// wait() only returns when compiling with -fno-exceptions.
EXPECT_EQ(123, loop.wait(kj::mv(promise)));
EXPECT_EQ(123, promise.wait());
}) != nullptr);
}
TEST(Async, HandleException) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
Promise<int> promise = evalLater(
[&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
int line = __LINE__ - 1;
promise = loop.there(kj::mv(promise),
promise = promise.then(
[](int i) { return i + 1; },
[&](Exception&& e) { EXPECT_EQ(line, e.getLine()); return 345; });
EXPECT_EQ(345, loop.wait(kj::mv(promise)));
EXPECT_EQ(345, promise.wait());
}
TEST(Async, PropagateException) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
Promise<int> promise = evalLater(
[&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
int line = __LINE__ - 1;
promise = loop.there(kj::mv(promise),
[](int i) { return i + 1; });
promise = promise.then([](int i) { return i + 1; });
promise = loop.there(kj::mv(promise),
promise = promise.then(
[](int i) { return i + 2; },
[&](Exception&& e) { EXPECT_EQ(line, e.getLine()); return 345; });
EXPECT_EQ(345, loop.wait(kj::mv(promise)));
EXPECT_EQ(345, promise.wait());
}
TEST(Async, PropagateExceptionTypeChange) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
Promise<int> promise = evalLater(
[&]() -> int { KJ_FAIL_ASSERT("foo") { return 123; } });
int line = __LINE__ - 1;
Promise<StringPtr> promise2 = loop.there(kj::mv(promise),
[](int i) -> StringPtr { return "foo"; });
Promise<StringPtr> promise2 = promise.then([](int i) -> StringPtr { return "foo"; });
promise2 = loop.there(kj::mv(promise2),
promise2 = promise2.then(
[](StringPtr s) -> StringPtr { return "bar"; },
[&](Exception&& e) -> StringPtr { EXPECT_EQ(line, e.getLine()); return "baz"; });
EXPECT_EQ("baz", loop.wait(kj::mv(promise2)));
EXPECT_EQ("baz", promise2.wait());
}
TEST(Async, Then) {
SimpleEventLoop loop;
Promise<int> promise = nullptr;
bool outerDone = false;
bool innerDone = false;
bool done = false;
loop.wait(loop.evalLater([&]() {
outerDone = true;
promise = Promise<int>(123).then([&](int i) {
EXPECT_EQ(&loop, &EventLoop::current());
innerDone = true;
return i + 321;
});
}));
Promise<int> promise = Promise<int>(123).then([&](int i) {
EXPECT_EQ(&loop, &EventLoop::current());
done = true;
return i + 321;
});
EXPECT_TRUE(outerDone);
EXPECT_FALSE(innerDone);
EXPECT_FALSE(done);
EXPECT_EQ(444, loop.wait(kj::mv(promise)));
EXPECT_EQ(444, promise.wait());
EXPECT_TRUE(innerDone);
EXPECT_TRUE(done);
}
TEST(Async, ThenInAnyThread) {
......@@ -164,26 +160,25 @@ TEST(Async, ThenInAnyThread) {
Promise<int> promise = a.thenInAnyThread([&](int ai) { done = true; return ai + 321; });
EXPECT_FALSE(done);
EXPECT_EQ(444, loop.wait(kj::mv(promise)));
EXPECT_EQ(444, promise.wait());
EXPECT_TRUE(done);
}
TEST(Async, Chain) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() -> int { return 123; });
Promise<int> promise2 = loop.evalLater([&]() -> int { return 321; });
Promise<int> promise = evalLater([&]() -> int { return 123; });
Promise<int> promise2 = evalLater([&]() -> int { return 321; });
auto promise3 = loop.there(kj::mv(promise),
[&](int i) {
EXPECT_EQ(&loop, &EventLoop::current());
return promise2.then([&loop,i](int j) {
EXPECT_EQ(&loop, &EventLoop::current());
return i + j;
});
});
auto promise3 = promise.then([&](int i) {
EXPECT_EQ(&loop, &EventLoop::current());
return promise2.then([&loop,i](int j) {
EXPECT_EQ(&loop, &EventLoop::current());
return i + j;
});
});
EXPECT_EQ(444, loop.wait(kj::mv(promise3)));
EXPECT_EQ(444, promise3.wait());
}
TEST(Async, SeparateFulfiller) {
......@@ -195,7 +190,7 @@ TEST(Async, SeparateFulfiller) {
pair.fulfiller->fulfill(123);
EXPECT_FALSE(pair.fulfiller->isWaiting());
EXPECT_EQ(123, loop.wait(kj::mv(pair.promise)));
EXPECT_EQ(123, pair.promise.wait());
}
TEST(Async, SeparateFulfillerVoid) {
......@@ -207,7 +202,7 @@ TEST(Async, SeparateFulfillerVoid) {
pair.fulfiller->fulfill();
EXPECT_FALSE(pair.fulfiller->isWaiting());
loop.wait(kj::mv(pair.promise));
pair.promise.wait();
}
TEST(Async, SeparateFulfillerCanceled) {
......@@ -230,7 +225,7 @@ TEST(Async, SeparateFulfillerChained) {
inner.fulfiller->fulfill(123);
EXPECT_EQ(123, loop.wait(kj::mv(pair.promise)));
EXPECT_EQ(123, pair.promise.wait());
}
#if KJ_NO_EXCEPTIONS
......@@ -244,48 +239,73 @@ TEST(Async, SeparateFulfillerDiscarded) {
auto pair = newPromiseAndFulfiller<int>();
pair.fulfiller = nullptr;
EXPECT_ANY_THROW(loop.wait(kj::mv(pair.promise)));
EXPECT_ANY_THROW(pair.promise.wait());
}
TEST(Async, Threads) {
EXPECT_ANY_THROW(EventLoop::current());
SimpleEventLoop loop1;
SimpleEventLoop loop2;
{
SimpleEventLoop loop1;
auto getThreadLoop = newPromiseAndFulfiller<const EventLoop*>();
auto exitThread = newPromiseAndFulfiller<void>();
Thread thread([&]() {
EXPECT_ANY_THROW(EventLoop::current());
{
SimpleEventLoop threadLoop;
getThreadLoop.fulfiller->fulfill(&threadLoop);
exitThread.promise.wait();
}
EXPECT_ANY_THROW(EventLoop::current());
});
auto exitThread = newPromiseAndFulfiller<void>();
// Make sure the thread will exit.
KJ_DEFER(exitThread.fulfiller->fulfill());
Promise<int> promise = loop1.evalLater([]() { return 123; });
promise = loop2.there(kj::mv(promise), [](int ai) { return ai + 321; });
const EventLoop& loop2 = *loop1.wait(kj::mv(getThreadLoop.promise));
for (uint i = 0; i < 100; i++) {
promise = loop1.there(kj::mv(promise), [&](int ai) {
EXPECT_EQ(&loop1, &EventLoop::current());
return ai + 1;
});
promise = loop2.there(kj::mv(promise), [&](int ai) {
EXPECT_EQ(&loop2, &EventLoop::current());
return ai + 1000;
});
Promise<int> promise = evalLater([]() { return 123; });
promise = loop2.there(kj::mv(promise), [](int ai) { return ai + 321; });
for (uint i = 0; i < 100; i++) {
promise = loop1.there(kj::mv(promise), [&](int ai) {
EXPECT_EQ(&loop1, &EventLoop::current());
return ai + 1;
});
promise = loop2.there(kj::mv(promise), [&](int ai) {
EXPECT_EQ(&loop2, &EventLoop::current());
return ai + 1000;
});
}
EXPECT_EQ(100544, loop1.wait(kj::mv(promise)));
}
EXPECT_ANY_THROW(EventLoop::current());
}
TEST(Async, Ordering) {
SimpleEventLoop loop1;
auto getThreadLoop = newPromiseAndFulfiller<const EventLoop*>();
auto exitThread = newPromiseAndFulfiller<void>();
Thread thread([&]() {
EXPECT_ANY_THROW(EventLoop::current());
loop2.wait(kj::mv(exitThread.promise));
{
SimpleEventLoop threadLoop;
getThreadLoop.fulfiller->fulfill(&threadLoop);
exitThread.promise.wait();
}
EXPECT_ANY_THROW(EventLoop::current());
});
// Make sure the thread will exit.
KJ_DEFER(exitThread.fulfiller->fulfill());
EXPECT_EQ(100544, loop1.wait(kj::mv(promise)));
EXPECT_ANY_THROW(EventLoop::current());
}
TEST(Async, Ordering) {
SimpleEventLoop loop1;
SimpleEventLoop loop2;
const EventLoop& loop2 = *loop1.wait(kj::mv(getThreadLoop.promise));
int counter = 0;
Promise<void> promises[6] = {nullptr, nullptr, nullptr, nullptr, nullptr, nullptr};
......@@ -319,17 +339,6 @@ TEST(Async, Ordering) {
return Promise<void>(READY_NOW);
});
auto exitThread = newPromiseAndFulfiller<void>();
Thread thread([&]() {
EXPECT_ANY_THROW(EventLoop::current());
loop2.wait(kj::mv(exitThread.promise));
EXPECT_ANY_THROW(EventLoop::current());
});
// Make sure the thread will exit.
KJ_DEFER(exitThread.fulfiller->fulfill());
for (auto i: indices(promises)) {
loop1.wait(kj::mv(promises[i]));
}
......@@ -346,55 +355,54 @@ TEST(Async, Spark) {
Promise<void> unsparked = nullptr;
Promise<void> then = nullptr;
Promise<void> later = nullptr;
Promise<void> sparked = nullptr;
Thread([&]() {
// `sparked` will evaluate eagerly, even though we never wait on it, because there() is being
// called from outside the event loop.
sparked = loop.there(Promise<void>(READY_NOW), [&]() {
// `unsparked` will never execute because it's attached to the current loop and we never wait
// on it.
unsparked = loop.there(Promise<void>(READY_NOW), [&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `then` will similarly never execute.
then = Promise<void>(READY_NOW).then([&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `sparked` will evaluate eagerly, even though we never wait on it, because there() is being
// called from outside the event loop.
Promise<void> sparked = loop.there(Promise<void>(READY_NOW), [&]() {
// `unsparked` will never execute because it's attached to the current loop and we never wait
// on it.
unsparked = loop.there(Promise<void>(READY_NOW), [&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `then` will similarly never execute.
then = Promise<void>(READY_NOW).then([&]() {
ADD_FAILURE() << "This continuation shouldn't happen because no one waits on it.";
});
// `evalLater` *does* eagerly execute even when queued to the same loop.
later = loop.evalLater([&]() {
notification.fulfiller->fulfill();
// `evalLater` *does* eagerly execute even when queued to the same loop.
later = loop.evalLater([&]() {
notification.fulfiller->fulfill();
});
});
});
loop.wait(kj::mv(notification.promise));
notification.promise.wait();
}
TEST(Async, Fork) {
SimpleEventLoop loop;
auto outer = loop.evalLater([&]() {
Promise<int> promise = loop.evalLater([&]() { return 123; });
auto fork = promise.fork();
auto branch1 = fork.addBranch().then([](int i) {
EXPECT_EQ(123, i);
return 456;
});
auto branch2 = fork.addBranch().then([](int i) {
EXPECT_EQ(123, i);
return 789;
});
Promise<int> promise = evalLater([&]() { return 123; });
{
auto releaseFork = kj::mv(fork);
}
auto fork = promise.fork();
EXPECT_EQ(456, loop.wait(kj::mv(branch1)));
EXPECT_EQ(789, loop.wait(kj::mv(branch2)));
auto branch1 = fork.addBranch().then([](int i) {
EXPECT_EQ(123, i);
return 456;
});
auto branch2 = fork.addBranch().then([](int i) {
EXPECT_EQ(123, i);
return 789;
});
{
auto releaseFork = kj::mv(fork);
}
loop.wait(kj::mv(outer));
EXPECT_EQ(456, branch1.wait());
EXPECT_EQ(789, branch2.wait());
}
struct RefcountedInt: public Refcounted {
......@@ -406,76 +414,72 @@ struct RefcountedInt: public Refcounted {
TEST(Async, ForkRef) {
SimpleEventLoop loop;
auto outer = loop.evalLater([&]() {
Promise<Own<RefcountedInt>> promise = loop.evalLater([&]() {
return refcounted<RefcountedInt>(123);
});
auto fork = promise.fork();
auto branch1 = fork.addBranch().then([](Own<const RefcountedInt>&& i) {
EXPECT_EQ(123, i->i);
return 456;
});
auto branch2 = fork.addBranch().then([](Own<const RefcountedInt>&& i) {
EXPECT_EQ(123, i->i);
return 789;
});
Promise<Own<RefcountedInt>> promise = evalLater([&]() {
return refcounted<RefcountedInt>(123);
});
{
auto releaseFork = kj::mv(fork);
}
auto fork = promise.fork();
EXPECT_EQ(456, loop.wait(kj::mv(branch1)));
EXPECT_EQ(789, loop.wait(kj::mv(branch2)));
auto branch1 = fork.addBranch().then([](Own<const RefcountedInt>&& i) {
EXPECT_EQ(123, i->i);
return 456;
});
auto branch2 = fork.addBranch().then([](Own<const RefcountedInt>&& i) {
EXPECT_EQ(123, i->i);
return 789;
});
loop.wait(kj::mv(outer));
{
auto releaseFork = kj::mv(fork);
}
EXPECT_EQ(456, branch1.wait());
EXPECT_EQ(789, branch2.wait());
}
TEST(Async, ExclusiveJoin) {
{
SimpleEventLoop loop;
auto left = loop.evalLater([&]() { return 123; });
auto left = evalLater([&]() { return 123; });
auto right = newPromiseAndFulfiller<int>(); // never fulfilled
auto promise = loop.exclusiveJoin(kj::mv(left), kj::mv(right.promise));
left.exclusiveJoin(kj::mv(right.promise));
EXPECT_EQ(123, loop.wait(kj::mv(promise)));
EXPECT_EQ(123, left.wait());
}
{
SimpleEventLoop loop;
auto left = newPromiseAndFulfiller<int>(); // never fulfilled
auto right = loop.evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 123; });
auto promise = loop.exclusiveJoin(kj::mv(left.promise), kj::mv(right));
left.promise.exclusiveJoin(kj::mv(right));
EXPECT_EQ(123, loop.wait(kj::mv(promise)));
EXPECT_EQ(123, left.promise.wait());
}
{
SimpleEventLoop loop;
auto left = loop.evalLater([&]() { return 123; });
auto right = loop.evalLater([&]() { return 456; });
auto left = evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 456; });
auto promise = loop.exclusiveJoin(kj::mv(left), kj::mv(right));
left.exclusiveJoin(kj::mv(right));
EXPECT_EQ(123, loop.wait(kj::mv(promise)));
EXPECT_EQ(123, left.wait());
}
{
SimpleEventLoop loop;
auto right = loop.evalLater([&]() { return 456; });
auto left = loop.evalLater([&]() { return 123; });
auto right = evalLater([&]() { return 456; });
auto left = evalLater([&]() { return 123; });
auto promise = loop.exclusiveJoin(kj::mv(left), kj::mv(right));
left.exclusiveJoin(kj::mv(right));
EXPECT_EQ(456, loop.wait(kj::mv(promise)));
EXPECT_EQ(456, left.wait());
}
}
......@@ -495,24 +499,24 @@ TEST(Async, TaskSet) {
int counter = 0;
tasks.add(loop.evalLater([&]() {
tasks.add(evalLater([&]() {
EXPECT_EQ(0, counter++);
}));
tasks.add(loop.evalLater([&]() {
tasks.add(evalLater([&]() {
EXPECT_EQ(1, counter++);
KJ_FAIL_ASSERT("example TaskSet failure") { break; }
}));
tasks.add(loop.evalLater([&]() {
tasks.add(evalLater([&]() {
EXPECT_EQ(2, counter++);
}));
(void)loop.evalLater([&]() {
(void)evalLater([&]() {
ADD_FAILURE() << "Promise without waiter shouldn't execute.";
});
loop.wait(loop.evalLater([&]() {
evalLater([&]() {
EXPECT_EQ(3, counter++);
}));
}).wait();
EXPECT_EQ(4, counter);
EXPECT_EQ(1u, errorHandler.exceptionCount);
......@@ -525,22 +529,26 @@ TEST(Async, EventLoopGuarded) {
{
EXPECT_EQ(123, guarded.getValue());
// We're not in the event loop, so the function will be applied later.
auto promise = guarded.applyNow([](int& i) -> const char* {
EXPECT_EQ(123, i);
i = 234;
return "foo";
kj::Promise<const char*> promise = nullptr;
Thread([&]() {
// We're not in the event loop, so the function will be applied later.
promise = guarded.applyNow([](int& i) -> const char* {
EXPECT_EQ(123, i);
i = 234;
return "foo";
});
});
EXPECT_EQ(123, guarded.getValue());
EXPECT_STREQ("foo", loop.wait(kj::mv(promise)));
EXPECT_STREQ("foo", promise.wait());
EXPECT_EQ(234, guarded.getValue());
}
{
auto promise = loop.evalLater([&]() {
auto promise = evalLater([&]() {
EXPECT_EQ(234, guarded.getValue());
// Since we're in the event loop, applyNow() will apply synchronously.
......@@ -555,13 +563,13 @@ TEST(Async, EventLoopGuarded) {
return kj::mv(promise);
});
EXPECT_STREQ("bar", loop.wait(kj::mv(promise)));
EXPECT_STREQ("bar", promise.wait());
EXPECT_EQ(345, guarded.getValue());
}
{
auto promise = loop.evalLater([&]() {
auto promise = evalLater([&]() {
EXPECT_EQ(345, guarded.getValue());
// applyLater() is never synchronous.
......@@ -576,7 +584,7 @@ TEST(Async, EventLoopGuarded) {
return kj::mv(promise);
});
EXPECT_STREQ("baz", loop.wait(kj::mv(promise)));
EXPECT_STREQ("baz", promise.wait());
EXPECT_EQ(456, guarded.getValue());
}
......@@ -596,20 +604,20 @@ TEST(Async, Attach) {
SimpleEventLoop loop;
Promise<int> promise = loop.evalLater([&]() {
Promise<int> promise = evalLater([&]() {
EXPECT_FALSE(destroyed);
return 123;
});
promise.attach(kj::heap<DestructorDetector>(destroyed));
promise = loop.there(kj::mv(promise), [&](int i) {
promise = promise.then([&](int i) {
EXPECT_TRUE(destroyed);
return i + 321;
});
EXPECT_FALSE(destroyed);
EXPECT_EQ(444, loop.wait(kj::mv(promise)));
EXPECT_EQ(444, promise.wait());
EXPECT_TRUE(destroyed);
}
......@@ -618,20 +626,16 @@ TEST(Async, EagerlyEvaluate) {
SimpleEventLoop loop;
Promise<void> promise = nullptr;
loop.wait(loop.evalLater([&]() {
promise = Promise<void>(READY_NOW).then([&]() {
called = true;
});
}));
loop.wait(loop.evalLater([]() {}));
Promise<void> promise = Promise<void>(READY_NOW).then([&]() {
called = true;
});
evalLater([]() {}).wait();
EXPECT_FALSE(called);
promise.eagerlyEvaluate(loop);
loop.wait(loop.evalLater([]() {}));
evalLater([]() {}).wait();
EXPECT_TRUE(called);
}
......
......@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <gtest/gtest.h>
#include <pthread.h>
namespace kj {
......@@ -41,13 +42,6 @@ inline void delay() { usleep(10000); }
#define EXPECT_SI_CODE(a,b)
#endif
class DummyErrorHandler: public TaskSet::ErrorHandler {
public:
void taskFailed(kj::Exception&& exception) override {
kj::throwRecoverableException(kj::mv(exception));
}
};
class AsyncUnixTest: public testing::Test {
public:
static void SetUpTestCase() {
......@@ -61,7 +55,7 @@ TEST_F(AsyncUnixTest, Signals) {
kill(getpid(), SIGUSR2);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
siginfo_t info = loop.onSignal(SIGUSR2).wait();
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
......@@ -79,7 +73,7 @@ TEST_F(AsyncUnixTest, SignalWithValue) {
value.sival_int = 123;
sigqueue(getpid(), SIGUSR2, value);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
siginfo_t info = loop.onSignal(SIGUSR2).wait();
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_SI_CODE(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
......@@ -88,66 +82,48 @@ TEST_F(AsyncUnixTest, SignalWithValue) {
TEST_F(AsyncUnixTest, SignalsMultiListen) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler);
tasks.add(loop.onSignal(SIGIO).thenInAnyThread([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal.";
}));
loop.daemonize(loop.onSignal(SIGIO).then([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal.";
}));
kill(getpid(), SIGUSR2);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
siginfo_t info = loop.onSignal(SIGUSR2).wait();
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
TEST_F(AsyncUnixTest, SignalsMultiReceive) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
kill(getpid(), SIGUSR2);
kill(getpid(), SIGIO);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
siginfo_t info = loop.onSignal(SIGUSR2).wait();
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
info = loop.wait(loop.onSignal(SIGIO));
info = loop.onSignal(SIGIO).wait();
EXPECT_EQ(SIGIO, info.si_signo);
EXPECT_SI_CODE(SI_USER, info.si_code);
}
TEST_F(AsyncUnixTest, SignalsAsync) {
// Arrange for another thread to wait on a UnixEventLoop...
auto exitThread = newPromiseAndFulfiller<void>();
UnixEventLoop unixLoop;
UnixEventLoop loop;
// Arrange for a signal to be sent from another thread.
pthread_t mainThread = pthread_self();
Thread thread([&]() {
unixLoop.wait(kj::mv(exitThread.promise));
delay();
pthread_kill(mainThread, SIGUSR2);
});
KJ_DEFER(exitThread.fulfiller->fulfill());
// Arrange to catch a signal in the other thread. But we haven't sent one yet.
bool received = false;
Promise<void> promise = unixLoop.there(unixLoop.onSignal(SIGUSR2),
[&](siginfo_t&& info) {
received = true;
EXPECT_EQ(SIGUSR2, info.si_signo);
siginfo_t info = loop.onSignal(SIGUSR2).wait();
EXPECT_EQ(SIGUSR2, info.si_signo);
#if __linux__
EXPECT_SI_CODE(SI_TKILL, info.si_code);
EXPECT_SI_CODE(SI_TKILL, info.si_code);
#endif
});
delay();
EXPECT_FALSE(received);
thread.sendSignal(SIGUSR2);
SimpleEventLoop mainLoop;
mainLoop.wait(kj::mv(promise));
EXPECT_TRUE(received);
}
TEST_F(AsyncUnixTest, Poll) {
......@@ -158,34 +134,31 @@ TEST_F(AsyncUnixTest, Poll) {
KJ_SYSCALL(pipe(pipefds));
KJ_SYSCALL(write(pipefds[1], "foo", 3));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds[0], POLLIN | POLLPRI)));
EXPECT_EQ(POLLIN, loop.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait());
}
TEST_F(AsyncUnixTest, PollMultiListen) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
int bogusPipefds[2];
KJ_SYSCALL(pipe(bogusPipefds));
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
TaskSet tasks(loop, dummyHandler);
tasks.add(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).thenInAnyThread([](short s) {
KJ_DBG(s);
ADD_FAILURE() << "Received wrong poll.";
}));
loop.daemonize(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) {
KJ_DBG(s);
ADD_FAILURE() << "Received wrong poll.";
}));
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(write(pipefds[1], "foo", 3));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds[0], POLLIN | POLLPRI)));
EXPECT_EQ(POLLIN, loop.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait());
}
TEST_F(AsyncUnixTest, PollMultiReceive) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
......@@ -197,40 +170,24 @@ TEST_F(AsyncUnixTest, PollMultiReceive) {
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
KJ_SYSCALL(write(pipefds2[1], "bar", 3));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds[0], POLLIN | POLLPRI)));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds2[0], POLLIN | POLLPRI)));
EXPECT_EQ(POLLIN, loop.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait());
EXPECT_EQ(POLLIN, loop.onFdEvent(pipefds2[0], POLLIN | POLLPRI).wait());
}
TEST_F(AsyncUnixTest, PollAsync) {
// Arrange for another thread to wait on a UnixEventLoop...
auto exitThread = newPromiseAndFulfiller<void>();
UnixEventLoop unixLoop;
Thread thread([&]() {
unixLoop.wait(kj::mv(exitThread.promise));
});
KJ_DEFER(exitThread.fulfiller->fulfill());
UnixEventLoop loop;
// Make a pipe and wait on its read end in another thread. But don't write to it yet.
// Make a pipe and wait on its read end while another thread writes to it.
int pipefds[2];
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(pipe(pipefds));
bool received = false;
Promise<void> promise = unixLoop.there(unixLoop.onFdEvent(pipefds[0], POLLIN | POLLPRI),
[&](short events) {
received = true;
EXPECT_EQ(POLLIN, events);
Thread thread([&]() {
delay();
KJ_SYSCALL(write(pipefds[1], "foo", 3));
});
delay();
EXPECT_FALSE(received);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
SimpleEventLoop mainLoop;
mainLoop.wait(kj::mv(promise));
EXPECT_TRUE(received);
// Wait for the event in this thread.
EXPECT_EQ(POLLIN, loop.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait());
}
} // namespace kj
......@@ -165,18 +165,27 @@ bool EventLoop::isCurrent() const {
}
EventLoop::EventLoop()
: daemons(kj::heap<_::TaskSetImpl>(*this, _::LoggingErrorHandler::instance)) {}
: daemons(kj::heap<_::TaskSetImpl>(*this, _::LoggingErrorHandler::instance)) {
KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop.");
threadLocalEventLoop = this;
}
EventLoop::~EventLoop() noexcept(false) {}
EventLoop::~EventLoop() noexcept(false) {
KJ_REQUIRE(threadLocalEventLoop == this,
"EventLoop being destroyed in a different thread than it was created.");
threadLocalEventLoop = nullptr;
queue.clearCallback();
}
void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result) {
EventLoop* oldEventLoop = threadLocalEventLoop;
threadLocalEventLoop = this;
KJ_DEFER(threadLocalEventLoop = oldEventLoop);
KJ_REQUIRE(!running, "wait() is not allowed from within event callbacks.");
BoolEvent event(*this);
event.fired = node->onReady(event);
running = true;
KJ_DEFER(running = false);
while (!event.fired) {
KJ_IF_MAYBE(event, queue.peek(nullptr)) {
// Arrange for events armed during the event callback to be inserted at the beginning
......
......@@ -207,8 +207,13 @@ class EventLoop: private _::NewJobCallback {
// EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
// documentation for `Promise`.
//
// You will need to construct an `EventLoop` at the top level of your program. You can then
// use it to construct some promises and wait on the result. Example:
// Each thread can have at most one EventLoop. When an EventLoop is created, it becomes the
// default loop for the current thread. Async APIs require that the thread has a current
// EventLoop, or they will throw exceptions.
//
// Generally, you will want to construct an `EventLoop` at the top level of your program, e.g.
// in the main() function, or in the start function of a thread. You can then use it to
// construct some promises and wait on the result. Example:
//
// int main() {
// SimpleEventLoop loop;
......@@ -243,30 +248,24 @@ public:
// Run the event loop until the promise is fulfilled, then return its result. If the promise
// is rejected, throw an exception.
//
// It is possible to call wait() multiple times on the same event loop simultaneously, but you
// must be very careful about this. Here's the deal:
// - If wait() is called from thread A when it is already being executed in thread B, then
// thread A will block at least until thread B's call to wait() completes, _even if_ the
// promise is fulfilled before that.
// - If wait() is called recursively from a thread in which wait() is already running, then
// the inner wait() will proceed, but the outer wait() obviously cannot return until the inner
// wait() completes.
// - Keep in mind that while wait() is running the event loop, it may be firing events that have
// nothing to do with the thing you're actually waiting for. Avoid holding any mutex locks
// when you call wait() as if some other event handler happens to try to take that lock, you
// will deadlock.
// wait() cannot be called recursively -- that is, an event callback cannot call wait().
// Instead, callbacks that need to perform more async operations should return a promise and
// rely on promise chaining.
//
// In general, it is only a good idea to use `wait()` in high-level code that has a simple
// goal, e.g. in the main() function of a program that does one or two specific things and then
// exits. On the other hand, `wait()` should be avoided in library code, unless you spawn a
// private thread and event loop to use it on. Put another way, `wait()` is useful for quick
// prototyping but generally bad for "real code".
// wait() is primarily useful at the top level of a program -- typically, within the function
// that allocated the EventLoop. For example, a program that performs one or two RPCs and then
// exits would likely use wait() in its main() function to wait on each RPC. On the other hand,
// server-side code generally cannot use wait(), because it has to be able to accept multiple
// requests at once.
//
// If the promise is rejected, `wait()` throws an exception. This exception is usually fatal,
// so if compiled with -fno-exceptions, the process will abort. You may work around this by
// using `there()` with an error handler to handle this case. If your error handler throws a
// non-fatal exception and then recovers by returning a dummy value, wait() will also throw a
// non-fatal exception and return the same dummy value.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
template <typename Func>
PromiseForResult<Func, void> evalLater(Func&& func) const KJ_WARN_UNUSED_RESULT;
......@@ -394,6 +393,9 @@ private:
Event& event;
};
bool running = false;
// True while looping -- wait() is then not allowed.
_::WorkQueue<EventJob> queue;
Maybe<_::WorkQueue<EventJob>::JobWrapper&> insertionPoint;
......@@ -418,6 +420,7 @@ private:
template <typename>
friend class Promise;
friend Promise<void> yield();
};
// -------------------------------------------------------------------
......@@ -634,11 +637,7 @@ public:
// up doing the same thing as then(); don't use this unless you really know you need it.
T wait();
// Equivalent to `EventLoop::current().wait(kj::mv(*this))`. WARNING: Although `wait()`
// advances the event loop, calls to `wait()` obviously can only return in the reverse of the
// order in which they were made. `wait()` should therefore be considered a hack that should be
// avoided. Consider using it only in high-level and one-off code. In deep library code, use
// `then()` instead.
// Equivalent to `EventLoop::current().wait(kj::mv(*this))`.
//
// Note that `wait()` consumes the promise on which it is called, in the sense of move semantics.
// After returning, the promise is no longer valid, and cannot be `wait()`ed on or `then()`ed
......@@ -784,6 +783,11 @@ constexpr _::Void READY_NOW = _::Void();
// Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
// cast to `Promise<void>`.
template <typename Func>
inline PromiseForResult<Func, void> evalLater(Func&& func) {
return EventLoop::current().evalLater(func);
}
// -------------------------------------------------------------------
// Hack for creating a lambda that holds an owned pointer.
......@@ -1481,7 +1485,7 @@ T EventLoop::wait(Promise<T>&& promise) {
}
template <typename Func>
auto EventLoop::evalLater(Func&& func) const -> PromiseForResult<Func, void> {
PromiseForResult<Func, void> EventLoop::evalLater(Func&& func) const {
// Invoke thereImpl() on yieldIfSameThread(). Always spark the result.
return PromiseForResult<Func, void>(false,
_::spark<_::FixVoid<_::JoinPromises<_::ReturnType<Func, void>>>>(
......
......@@ -166,6 +166,8 @@ public:
// Either the job's complete() or cancel() method will be called exactly once and will have
// returned before the Own<const Job>'s destructor finishes.
void clearCallback() { newJobCallback = nullptr; }
private:
JobWrapper* head = nullptr;
// Pointer to the first job.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment