Commit 113b4d84 authored by Kenton Varda's avatar Kenton Varda

EventLoopGuarded is like MutexGuarded but synchronizes by queuing operations to a single EventLoop.

parent bf69c94b
...@@ -347,21 +347,18 @@ private: ...@@ -347,21 +347,18 @@ private:
class LocalClient final: public ClientHook, public kj::Refcounted { class LocalClient final: public ClientHook, public kj::Refcounted {
public: public:
LocalClient(const kj::EventLoop& eventLoop, kj::Own<Capability::Server>&& server) LocalClient(const kj::EventLoop& eventLoop, kj::Own<Capability::Server>&& server)
: eventLoop(eventLoop), server(kj::mv(server)) {} : server(eventLoop, kj::mv(server)) {}
Request<ObjectPointer, ObjectPointer> newCall( Request<ObjectPointer, ObjectPointer> newCall(
uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override { uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override {
auto hook = kj::heap<LocalRequest>( auto hook = kj::heap<LocalRequest>(
eventLoop, interfaceId, methodId, firstSegmentWordSize, kj::addRef(*this)); server.getEventLoop(), interfaceId, methodId, firstSegmentWordSize, kj::addRef(*this));
auto root = hook->message->getRoot(); // Do not inline `root` -- kj::mv may happen first. auto root = hook->message->getRoot(); // Do not inline `root` -- kj::mv may happen first.
return Request<ObjectPointer, ObjectPointer>(root, kj::mv(hook)); return Request<ObjectPointer, ObjectPointer>(root, kj::mv(hook));
} }
VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId, VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
kj::Own<CallContextHook>&& context) const override { kj::Own<CallContextHook>&& context) const override {
// We can const-cast the server because we're synchronizing on the event loop.
auto server = const_cast<Capability::Server*>(this->server.get());
auto contextPtr = context.get(); auto contextPtr = context.get();
// We don't want to actually dispatch the call synchronously, because: // We don't want to actually dispatch the call synchronously, because:
...@@ -374,26 +371,26 @@ public: ...@@ -374,26 +371,26 @@ public:
// //
// Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
// complete before 'whenMoreResolved()' promises resolve. // complete before 'whenMoreResolved()' promises resolve.
auto promise = eventLoop.evalLater( auto promise = server.applyLater(
[=]() { [=](kj::Own<Capability::Server>& server) {
return server->dispatchCall(interfaceId, methodId, return server->dispatchCall(interfaceId, methodId,
CallContext<ObjectPointer, ObjectPointer>(*contextPtr)); CallContext<ObjectPointer, ObjectPointer>(*contextPtr));
}); });
// Make sure that this client cannot be destroyed until the promise completes. // Make sure that this client cannot be destroyed until the promise completes.
promise = eventLoop.there(kj::mv(promise), kj::mvCapture(kj::addRef(*this), promise = promise.thenInAnyThread(kj::mvCapture(kj::addRef(*this),
[=](kj::Own<const LocalClient>&& ref) {})); [](kj::Own<const LocalClient>&& ref) {}));
// We have to fork this promise for the pipeline to receive a copy of the answer. // We have to fork this promise for the pipeline to receive a copy of the answer.
auto forked = eventLoop.fork(kj::mv(promise)); auto forked = server.getEventLoop().fork(kj::mv(promise));
auto pipelinePromise = eventLoop.there(forked.addBranch(), kj::mvCapture(context->addRef(), auto pipelinePromise = forked.addBranch().thenInAnyThread(kj::mvCapture(context->addRef(),
[=](kj::Own<CallContextHook>&& context) -> kj::Own<const PipelineHook> { [=](kj::Own<CallContextHook>&& context) -> kj::Own<const PipelineHook> {
context->releaseParams(); context->releaseParams();
return kj::refcounted<LocalPipeline>(kj::mv(context)); return kj::refcounted<LocalPipeline>(kj::mv(context));
})); }));
auto completionPromise = eventLoop.there(forked.addBranch(), kj::mvCapture(context, auto completionPromise = forked.addBranch().thenInAnyThread(kj::mvCapture(context,
[=](kj::Own<CallContextHook>&& context) { [=](kj::Own<CallContextHook>&& context) {
// Nothing to do here. We just wanted to make sure to hold on to a reference to the // Nothing to do here. We just wanted to make sure to hold on to a reference to the
// context even if the pipeline was discarded. // context even if the pipeline was discarded.
...@@ -403,7 +400,7 @@ public: ...@@ -403,7 +400,7 @@ public:
})); }));
return VoidPromiseAndPipeline { kj::mv(completionPromise), return VoidPromiseAndPipeline { kj::mv(completionPromise),
kj::refcounted<QueuedPipeline>(eventLoop, kj::mv(pipelinePromise)) }; kj::refcounted<QueuedPipeline>(server.getEventLoop(), kj::mv(pipelinePromise)) };
} }
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
...@@ -420,8 +417,7 @@ public: ...@@ -420,8 +417,7 @@ public:
} }
private: private:
const kj::EventLoop& eventLoop; kj::EventLoopGuarded<kj::Own<Capability::Server>> server;
kj::Own<Capability::Server> server;
}; };
} // namespace } // namespace
......
...@@ -472,5 +472,69 @@ TEST(Async, TaskSet) { ...@@ -472,5 +472,69 @@ TEST(Async, TaskSet) {
EXPECT_EQ(1u, errorHandler.exceptionCount); EXPECT_EQ(1u, errorHandler.exceptionCount);
} }
TEST(Async, EventLoopGuarded) {
SimpleEventLoop loop;
EventLoopGuarded<int> guarded(loop, 123);
{
EXPECT_EQ(123, guarded.getValue());
// We're not in the event loop, so the function will be applied later.
auto promise = guarded.applyNow([](int& i) -> const char* {
EXPECT_EQ(123, i);
i = 234;
return "foo";
});
EXPECT_EQ(123, guarded.getValue());
EXPECT_STREQ("foo", loop.wait(kj::mv(promise)));
EXPECT_EQ(234, guarded.getValue());
}
{
auto promise = loop.evalLater([&]() {
EXPECT_EQ(234, guarded.getValue());
// Since we're in the event loop, applyNow() will apply synchronously.
auto promise = guarded.applyNow([](int& i) -> const char* {
EXPECT_EQ(234, i);
i = 345;
return "bar";
});
EXPECT_EQ(345, guarded.getValue()); // already applied
return kj::mv(promise);
});
EXPECT_STREQ("bar", loop.wait(kj::mv(promise)));
EXPECT_EQ(345, guarded.getValue());
}
{
auto promise = loop.evalLater([&]() {
EXPECT_EQ(345, guarded.getValue());
// applyLater() is never synchronous.
auto promise = guarded.applyLater([](int& i) -> const char* {
EXPECT_EQ(345, i);
i = 456;
return "baz";
});
EXPECT_EQ(345, guarded.getValue());
return kj::mv(promise);
});
EXPECT_STREQ("baz", loop.wait(kj::mv(promise)));
EXPECT_EQ(456, guarded.getValue());
}
}
} // namespace } // namespace
} // namespace kj } // namespace kj
...@@ -669,6 +669,59 @@ private: ...@@ -669,6 +669,59 @@ private:
friend class EventLoop; friend class EventLoop;
}; };
template <typename T>
class EventLoopGuarded {
// An instance of T that is bound to a particular EventLoop and may only be modified within that
// loop.
public:
template <typename... Params>
inline EventLoopGuarded(const EventLoop& loop, Params&&... params)
: loop(loop), value(kj::fwd<Params>(params)...) {}
const T& getValue() const { return value; }
// Get a const (thread-safe) reference to the value.
const EventLoop& getEventLoop() const { return loop; }
// Get the EventLoop in which this value can be modified.
template <typename Func>
PromiseForResult<Func, T&> applyNow(Func&& func) const KJ_WARN_UNUSED_RESULT {
// Calls the given function, passing the guarded object to it as a mutable reference, and
// returning a pointer to the function's result. When called from within the object's event
// loop, the function runs synchronously, but when called from any other thread, the function
// is queued to run on the object's loop later.
if (loop.isCurrent()) {
return func(const_cast<T&>(value));
} else {
return applyLater(kj::fwd<Func>(func));
}
}
template <typename Func>
PromiseForResult<Func, T&> applyLater(Func&& func) const KJ_WARN_UNUSED_RESULT {
// Like `applyNow` but always queues the function to run later regardless of which thread
// called it.
return loop.evalLater(Capture<Func> { const_cast<T&>(value), kj::fwd<Func>(func) });
}
private:
const EventLoop& loop;
T value;
template <typename Func>
struct Capture {
T& value;
Func func;
decltype(func(value)) operator()() {
return func(value);
}
};
};
class TaskSet { class TaskSet {
// Holds a collection of Promise<void>s and ensures that each executes to completion. Memory // Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
// associated with each promise is automatically freed when the promise completes. Destroying // associated with each promise is automatically freed when the promise completes. Destroying
......
linux-gcc-4.7 1731 ./super-test.sh tmpdir capnp-gcc-4.7 quick linux-gcc-4.7 1731 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1734 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8 linux-gcc-4.8 1734 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1754 ./super-test.sh tmpdir capnp-clang quick clang linux-clang 1754 ./super-test.sh tmpdir capnp-clang quick clang
mac 802 ./super-test.sh remote beat caffeinate quick mac 807 ./super-test.sh remote beat caffeinate quick
cygwin 810 ./super-test.sh remote Kenton@flashman quick cygwin 810 ./super-test.sh remote Kenton@flashman quick
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment