Commit c1e51108 authored by Kenton Varda's avatar Kenton Varda

TaskSet manages executing promises for 'daemon' operations.

parent eda17e08
......@@ -157,14 +157,18 @@ struct Import {
// at exactly the moment that we call addRef() on it.)
};
class RpcConnectionState {
class RpcConnectionState: public kj::TaskSet::ErrorHandler {
public:
RpcConnectionState(const kj::EventLoop& eventLoop,
kj::Own<VatNetworkBase::Connection>&& connection)
: eventLoop(eventLoop), connection(kj::mv(connection)) {
: eventLoop(eventLoop), connection(kj::mv(connection)), tasks(eventLoop, *this) {
tasks.add(messageLoop());
}
void taskFailed(kj::Exception&& exception) override {
// TODO(now): Kill the connection.
}
private:
const kj::EventLoop& eventLoop;
kj::Own<VatNetworkBase::Connection> connection;
......
......@@ -392,5 +392,44 @@ TEST(Async, ForkRef) {
loop.wait(kj::mv(outer));
}
class ErrorHandlerImpl: public TaskSet::ErrorHandler {
public:
uint exceptionCount = 0;
void taskFailed(kj::Exception&& exception) override {
EXPECT_TRUE(exception.getDescription().endsWith("example TaskSet failure"));
++exceptionCount;
}
};
TEST(Async, TaskSet) {
SimpleEventLoop loop;
ErrorHandlerImpl errorHandler;
TaskSet tasks(loop, errorHandler);
int counter = 0;
tasks.add(loop.evalLater([&]() {
EXPECT_EQ(0, counter++);
}));
tasks.add(loop.evalLater([&]() {
EXPECT_EQ(1, counter++);
KJ_FAIL_ASSERT("example TaskSet failure") { break; }
}));
tasks.add(loop.evalLater([&]() {
EXPECT_EQ(2, counter++);
}));
(void)loop.evalLater([&]() {
ADD_FAILURE() << "Promise without waiter shouldn't execute.";
});
loop.wait(loop.evalLater([&]() {
EXPECT_EQ(3, counter++);
}));
EXPECT_EQ(4, counter);
EXPECT_EQ(1, errorHandler.exceptionCount);
}
} // namespace
} // namespace kj
......@@ -24,6 +24,7 @@
#include "async.h"
#include "debug.h"
#include <exception>
#include <map>
#if KJ_USE_FUTEX
#include <unistd.h>
......@@ -265,6 +266,69 @@ void PromiseBase::absolve() {
runCatchingExceptions([this]() { node = nullptr; });
}
class TaskSet::Impl {
public:
inline Impl(const EventLoop& loop, ErrorHandler& errorHandler)
: loop(loop), errorHandler(errorHandler) {}
class Task final: public EventLoop::Event {
public:
Task(const Impl& taskSet, Own<_::PromiseNode>&& nodeParam)
: EventLoop::Event(taskSet.loop), taskSet(taskSet), node(kj::mv(nodeParam)) {
if (node->onReady(*this)) {
// TODO(soon): Only yield cross-thread.
arm(EventLoop::Event::YIELD);
}
}
protected:
void fire() override {
// Get the result.
_::ExceptionOr<_::Void> result;
node->get(result);
// Delete the node, catching any exceptions.
KJ_IF_MAYBE(exception, runCatchingExceptions([this]() {
node = nullptr;
})) {
result.addException(kj::mv(*exception));
}
// Call the error handler if there was an exception.
KJ_IF_MAYBE(e, result.exception) {
taskSet.errorHandler.taskFailed(kj::mv(*e));
}
}
private:
const Impl& taskSet;
kj::Own<_::PromiseNode> node;
};
void add(Promise<void>&& promise) const {
auto task = heap<Task>(*this, _::makeSafeForLoop<_::Void>(kj::mv(promise.node), loop));
Task* ptr = task;
tasks.lockExclusive()->insert(std::make_pair(ptr, kj::mv(task)));
}
private:
const EventLoop& loop;
ErrorHandler& errorHandler;
// TODO(soon): Use a linked list instead. We should factor out the intrusive linked list code
// that appears in EventLoop and ForkHub.
MutexGuarded<std::map<Task*, Own<Task>>> tasks;
};
TaskSet::TaskSet(const EventLoop& loop, ErrorHandler& errorHandler)
: impl(heap<Impl>(loop, errorHandler)) {}
TaskSet::~TaskSet() noexcept(false) {}
void TaskSet::add(Promise<void>&& promise) const {
impl->add(kj::mv(promise));
}
namespace _ { // private
bool PromiseNode::atomicOnReady(EventLoop::Event*& onReadyEvent, EventLoop::Event& newEvent) {
......
......@@ -259,7 +259,7 @@ public:
// non-fatal exception and return the same dummy value.
template <typename Func>
auto evalLater(Func&& func) const -> PromiseForResult<Func, void>;
PromiseForResult<Func, void> evalLater(Func&& func) const KJ_WARN_UNUSED_RESULT;
// Schedule for the given zero-parameter function to be executed in the event loop at some
// point in the near future. Returns a Promise for its result -- or, if `func()` itself returns
// a promise, `evalLater()` returns a Promise for the result of resolving that promise.
......@@ -439,6 +439,7 @@ private:
friend class _::ChainPromiseNode;
template <typename>
friend class Promise;
friend class TaskSet;
};
template <typename T>
......@@ -653,6 +654,37 @@ private:
friend class EventLoop;
};
class TaskSet {
// Holds a collection of Promise<void>s and ensures that each executes to completion. Memory
// associated with each promise is automatically freed when the promise completes. Destroying
// the TaskSet itself automatically cancels all unfinished promises.
//
// This is useful for "daemon" objects that perform background tasks which aren't intended to
// fulfill any particular external promise. The daemon object holds a TaskSet to collect these
// tasks it is working on. This way, if the daemon itself is destroyed, the TaskSet is detroyed
// as well, and everything the daemon is doing is canceled. (The only alternative -- creating
// a promise that owns itself and deletes itself on completion -- does not allow for clean
// shutdown.)
public:
class ErrorHandler {
public:
virtual void taskFailed(kj::Exception&& exception) = 0;
};
TaskSet(const EventLoop& loop, ErrorHandler& errorHandler);
// `loop` will be used to wait on promises. `errorHandler` will be executed any time a task
// throws an exception, and will execute within the given EventLoop.
~TaskSet() noexcept(false);
void add(Promise<void>&& promise) const;
private:
class Impl;
Own<Impl> impl;
};
constexpr _::Void READY_NOW = _::Void();
// Use this when you need a Promise<void> that is already fulfilled -- this value can be implicitly
// cast to `Promise<void>`.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment