Commit c80258c8 authored by Kenton Varda's avatar Kenton Varda

kj::runIoEventLoop() is a shortcut for setting up an EventLoop that can do I/O,…

kj::runIoEventLoop() is a shortcut for setting up an EventLoop that can do I/O, without specifying a platform-specific EventLoop.
parent d475caca
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include "serialize.h" #include "serialize.h"
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/thread.h> #include <kj/thread.h>
#include <kj/async-unix.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
...@@ -44,9 +43,9 @@ public: ...@@ -44,9 +43,9 @@ public:
void write(const void* buffer, size_t size) override { void write(const void* buffer, size_t size) override {
while (size > 0) { while (size > 0) {
usleep(5000);
size_t n = rand() % size + 1; size_t n = rand() % size + 1;
inner.write(buffer, n); inner.write(buffer, n);
usleep(10000);
buffer = reinterpret_cast<const byte*>(buffer) + n; buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n; size -= n;
} }
...@@ -113,8 +112,6 @@ protected: ...@@ -113,8 +112,6 @@ protected:
}; };
TEST_F(SerializeAsyncTest, ParseAsync) { TEST_F(SerializeAsyncTest, ParseAsync) {
kj::UnixEventLoop loop;
auto input = kj::AsyncInputStream::wrapFd(fds[0]); auto input = kj::AsyncInputStream::wrapFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]); kj::FdOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput); FragmentingOutputStream output(rawOutput);
...@@ -122,21 +119,18 @@ TEST_F(SerializeAsyncTest, ParseAsync) { ...@@ -122,21 +119,18 @@ TEST_F(SerializeAsyncTest, ParseAsync) {
TestMessageBuilder message(1); TestMessageBuilder message(1);
initTestMessage(message.getRoot<TestAllTypes>()); initTestMessage(message.getRoot<TestAllTypes>());
auto promise = loop.evalLater([&]() {
return readMessage(*input);
});
kj::Thread thread([&]() { kj::Thread thread([&]() {
writeMessage(output, message); writeMessage(output, message);
}); });
auto received = loop.wait(kj::mv(promise)); auto received = kj::runIoEventLoop([&]() {
return readMessage(*input);
});
checkTestMessage(received->getRoot<TestAllTypes>()); checkTestMessage(received->getRoot<TestAllTypes>());
} }
TEST_F(SerializeAsyncTest, ParseAsyncOddSegmentCount) { TEST_F(SerializeAsyncTest, ParseAsyncOddSegmentCount) {
kj::UnixEventLoop loop;
auto input = kj::AsyncInputStream::wrapFd(fds[0]); auto input = kj::AsyncInputStream::wrapFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]); kj::FdOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput); FragmentingOutputStream output(rawOutput);
...@@ -144,21 +138,18 @@ TEST_F(SerializeAsyncTest, ParseAsyncOddSegmentCount) { ...@@ -144,21 +138,18 @@ TEST_F(SerializeAsyncTest, ParseAsyncOddSegmentCount) {
TestMessageBuilder message(7); TestMessageBuilder message(7);
initTestMessage(message.getRoot<TestAllTypes>()); initTestMessage(message.getRoot<TestAllTypes>());
auto promise = loop.evalLater([&]() {
return readMessage(*input);
});
kj::Thread thread([&]() { kj::Thread thread([&]() {
writeMessage(output, message); writeMessage(output, message);
}); });
auto received = loop.wait(kj::mv(promise)); auto received = kj::runIoEventLoop([&]() {
return readMessage(*input);
});
checkTestMessage(received->getRoot<TestAllTypes>()); checkTestMessage(received->getRoot<TestAllTypes>());
} }
TEST_F(SerializeAsyncTest, ParseAsyncEvenSegmentCount) { TEST_F(SerializeAsyncTest, ParseAsyncEvenSegmentCount) {
kj::UnixEventLoop loop;
auto input = kj::AsyncInputStream::wrapFd(fds[0]); auto input = kj::AsyncInputStream::wrapFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]); kj::FdOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput); FragmentingOutputStream output(rawOutput);
...@@ -166,21 +157,18 @@ TEST_F(SerializeAsyncTest, ParseAsyncEvenSegmentCount) { ...@@ -166,21 +157,18 @@ TEST_F(SerializeAsyncTest, ParseAsyncEvenSegmentCount) {
TestMessageBuilder message(10); TestMessageBuilder message(10);
initTestMessage(message.getRoot<TestAllTypes>()); initTestMessage(message.getRoot<TestAllTypes>());
auto promise = loop.evalLater([&]() {
return readMessage(*input);
});
kj::Thread thread([&]() { kj::Thread thread([&]() {
writeMessage(output, message); writeMessage(output, message);
}); });
auto received = loop.wait(kj::mv(promise)); auto received = kj::runIoEventLoop([&]() {
return readMessage(*input);
});
checkTestMessage(received->getRoot<TestAllTypes>()); checkTestMessage(received->getRoot<TestAllTypes>());
} }
TEST_F(SerializeAsyncTest, WriteAsync) { TEST_F(SerializeAsyncTest, WriteAsync) {
kj::UnixEventLoop loop;
auto output = kj::AsyncOutputStream::wrapFd(fds[1]); auto output = kj::AsyncOutputStream::wrapFd(fds[1]);
TestMessageBuilder message(1); TestMessageBuilder message(1);
...@@ -199,14 +187,12 @@ TEST_F(SerializeAsyncTest, WriteAsync) { ...@@ -199,14 +187,12 @@ TEST_F(SerializeAsyncTest, WriteAsync) {
} }
}); });
loop.wait(loop.evalLater([&]() { kj::runIoEventLoop([&]() {
return writeMessage(*output, message); return writeMessage(*output, message);
})); });
} }
TEST_F(SerializeAsyncTest, WriteAsyncOddSegmentCount) { TEST_F(SerializeAsyncTest, WriteAsyncOddSegmentCount) {
kj::UnixEventLoop loop;
auto output = kj::AsyncOutputStream::wrapFd(fds[1]); auto output = kj::AsyncOutputStream::wrapFd(fds[1]);
TestMessageBuilder message(7); TestMessageBuilder message(7);
...@@ -225,14 +211,12 @@ TEST_F(SerializeAsyncTest, WriteAsyncOddSegmentCount) { ...@@ -225,14 +211,12 @@ TEST_F(SerializeAsyncTest, WriteAsyncOddSegmentCount) {
} }
}); });
loop.wait(loop.evalLater([&]() { kj::runIoEventLoop([&]() {
return writeMessage(*output, message); return writeMessage(*output, message);
})); });
} }
TEST_F(SerializeAsyncTest, WriteAsyncEvenSegmentCount) { TEST_F(SerializeAsyncTest, WriteAsyncEvenSegmentCount) {
kj::UnixEventLoop loop;
auto output = kj::AsyncOutputStream::wrapFd(fds[1]); auto output = kj::AsyncOutputStream::wrapFd(fds[1]);
TestMessageBuilder message(10); TestMessageBuilder message(10);
...@@ -251,9 +235,9 @@ TEST_F(SerializeAsyncTest, WriteAsyncEvenSegmentCount) { ...@@ -251,9 +235,9 @@ TEST_F(SerializeAsyncTest, WriteAsyncEvenSegmentCount) {
} }
}); });
loop.wait(loop.evalLater([&]() { kj::runIoEventLoop([&]() {
return writeMessage(*output, message); return writeMessage(*output, message);
})); });
} }
} // namespace } // namespace
......
...@@ -165,5 +165,24 @@ TEST(AsyncIo, TwoWayPipe) { ...@@ -165,5 +165,24 @@ TEST(AsyncIo, TwoWayPipe) {
EXPECT_EQ("bar", result2); EXPECT_EQ("bar", result2);
} }
TEST(AsyncIo, RunIoEventLoop) {
auto pipe = newOneWayPipe();
char receiveBuffer[4];
String result = runIoEventLoop([&]() {
auto promise1 = pipe.out->write("foo", 3);
auto promise2 = pipe.in->tryRead(receiveBuffer, 3, 4)
.then([&](size_t n) {
EXPECT_EQ(3u, n);
return heapString(receiveBuffer, n);
});
return promise1.then(mvCapture(promise2, [](Promise<String> promise2) { return promise2; }));
});
EXPECT_EQ("foo", result);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -648,4 +648,12 @@ TwoWayPipe newTwoWayPipe() { ...@@ -648,4 +648,12 @@ TwoWayPipe newTwoWayPipe() {
return TwoWayPipe { { heap<Socket>(fds[0]), heap<Socket>(fds[1]) } }; return TwoWayPipe { { heap<Socket>(fds[0]), heap<Socket>(fds[1]) } };
} }
namespace _ { // private
void runIoEventLoopInternal(IoLoopMain& func) {
UnixEventLoop loop;
func.run(loop);
}
} // namespace _ (private)
} // namespace kj } // namespace kj
...@@ -180,6 +180,67 @@ TwoWayPipe newTwoWayPipe(); ...@@ -180,6 +180,67 @@ TwoWayPipe newTwoWayPipe();
// Creates two AsyncIoStreams representing the two ends of a two-way OS pipe (created with // Creates two AsyncIoStreams representing the two ends of a two-way OS pipe (created with
// socketpair(2)). Data written to one end can be read from the other. // socketpair(2)). Data written to one end can be read from the other.
// =======================================================================================
namespace _ { // private
class IoLoopMain {
public:
virtual void run(EventLoop& loop) = 0;
};
template <typename Func, typename Result>
class IoLoopMainImpl: public IoLoopMain {
public:
IoLoopMainImpl(Func&& func): func(kj::mv(func)) {}
void run(EventLoop& loop) override {
result = space.construct(loop.wait(loop.evalLater(func)));
}
Result getResult() { return kj::mv(*result); }
private:
Func func;
SpaceFor<Result> space;
Own<Result> result;
};
template <typename Func>
class IoLoopMainImpl<Func, void>: public IoLoopMain {
public:
IoLoopMainImpl(Func&& func): func(kj::mv(func)) {}
void run(EventLoop& loop) override {
loop.wait(loop.evalLater(func));
}
void getResult() {}
private:
Func func;
};
void runIoEventLoopInternal(IoLoopMain& func);
} // namespace _ (private)
template <typename Func>
auto runIoEventLoop(Func&& func) -> decltype(instance<EventLoop&>().wait(func())) {
// Sets up an appropriate EventLoop for doing I/O, then executes the given function. The function
// returns a promise. The EventLoop will continue running until that promise resolves, then the
// whole function will return its resolution. On return, the EventLoop is destroyed, cancelling
// all outstanding I/O.
//
// This function is great for running inside main() to set up an Async I/O environment without
// specifying a platform-specific EventLoop or other such things.
// TODO(cleanup): I wanted to forward-declare this function in order to document it separate
// from the implementation details but GCC claimed the two declarations were overloads rather
// than the same function, even though the signature was identical. FFFFFFFFFFUUUUUUUUUUUUUUU-
typedef decltype(instance<EventLoop&>().wait(instance<Func>()())) Result;
_::IoLoopMainImpl<Func, Result> func2(kj::fwd<Func>(func));
_::runIoEventLoopInternal(func2);
return func2.getResult();
}
} // namespace kj } // namespace kj
#endif // KJ_ASYNC_IO_H_ #endif // KJ_ASYNC_IO_H_
linux-gcc-4.7 1731 ./super-test.sh tmpdir capnp-gcc-4.7 quick linux-gcc-4.7 1735 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1734 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8 linux-gcc-4.8 1738 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1754 ./super-test.sh tmpdir capnp-clang quick clang linux-clang 1758 ./super-test.sh tmpdir capnp-clang quick clang
mac 807 ./super-test.sh remote beat caffeinate quick mac 807 ./super-test.sh remote beat caffeinate quick
cygwin 810 ./super-test.sh remote Kenton@flashman quick cygwin 810 ./super-test.sh remote Kenton@flashman quick
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment