Commit 5cb6cf76 authored by Kenton Varda's avatar Kenton Varda

Cleanup: Implement TaskSet using a list rather than a map.

I intended to do this, like, 4 years ago.
parent 7c8870eb
......@@ -38,6 +38,7 @@ class EventLoop;
template <typename T>
class Promise;
class WaitScope;
class TaskSet;
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
......@@ -172,8 +173,6 @@ class ChainPromiseNode;
template <typename T>
class ForkHub;
class TaskSetImpl;
class Event;
class PromiseBase {
......@@ -191,7 +190,7 @@ private:
friend class ChainPromiseNode;
template <typename>
friend class kj::Promise;
friend class TaskSetImpl;
friend class kj::TaskSet;
template <typename U>
friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises);
friend Promise<void> kj::joinPromises(Array<Promise<void>>&& promises);
......
......@@ -23,8 +23,6 @@
#include "debug.h"
#include "vector.h"
#include "threadlocal.h"
#include <exception>
#include <map>
#if KJ_USE_FUTEX
#include <unistd.h>
......@@ -86,32 +84,23 @@ public:
} // namespace
namespace _ { // private
class TaskSetImpl {
public:
inline TaskSetImpl(TaskSet::ErrorHandler& errorHandler)
TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler)
: errorHandler(errorHandler) {}
~TaskSetImpl() noexcept(false) {
// std::map doesn't like it when elements' destructors throw, so carefully disassemble it.
if (!tasks.empty()) {
Vector<Own<Task>> deleteMe(tasks.size());
for (auto& entry: tasks) {
deleteMe.add(kj::mv(entry.second));
}
}
}
TaskSet::~TaskSet() noexcept(false) {}
class Task final: public Event {
public:
Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
class TaskSet::Task final: public _::Event {
public:
Task(TaskSet& taskSet, Own<_::PromiseNode>&& nodeParam)
: taskSet(taskSet), node(kj::mv(nodeParam)) {
node->setSelfPointer(&node);
node->onReady(this);
}
protected:
Maybe<Own<Task>> next;
Maybe<Own<Task>>* prev = nullptr;
protected:
Maybe<Own<Event>> fire() override {
// Get the result.
_::ExceptionOr<_::Void> result;
......@@ -129,11 +118,15 @@ public:
taskSet.errorHandler.taskFailed(kj::mv(*e));
}
// Remove from the task map.
auto iter = taskSet.tasks.find(this);
KJ_ASSERT(iter != taskSet.tasks.end());
Own<Event> self = kj::mv(iter->second);
taskSet.tasks.erase(iter);
// Remove from the task list.
KJ_IF_MAYBE(n, next) {
n->get()->prev = prev;
}
Own<Event> self = kj::mv(KJ_ASSERT_NONNULL(*prev));
KJ_ASSERT(self.get() == this);
*prev = kj::mv(next);
next = nullptr;
prev = nullptr;
return mv(self);
}
......@@ -141,31 +134,38 @@ public:
return node;
}
private:
TaskSetImpl& taskSet;
kj::Own<_::PromiseNode> node;
};
private:
TaskSet& taskSet;
Own<_::PromiseNode> node;
};
void add(Promise<void>&& promise) {
void TaskSet::add(Promise<void>&& promise) {
auto task = heap<Task>(*this, kj::mv(promise.node));
Task* ptr = task;
tasks.insert(std::make_pair(ptr, kj::mv(task)));
KJ_IF_MAYBE(head, tasks) {
head->get()->prev = &task->next;
task->next = kj::mv(tasks);
}
task->prev = &tasks;
tasks = kj::mv(task);
}
kj::String trace() {
kj::String TaskSet::trace() {
kj::Vector<kj::String> traces;
for (auto& entry: tasks) {
traces.add(entry.second->trace());
Maybe<Own<Task>>* ptr = &tasks;
for (;;) {
KJ_IF_MAYBE(task, *ptr) {
traces.add(task->get()->trace());
ptr = &task->get()->next;
} else {
break;
}
return kj::strArray(traces, "\n============================================\n");
}
private:
TaskSet::ErrorHandler& errorHandler;
return kj::strArray(traces, "\n============================================\n");
}
// TODO(perf): Use a linked list instead.
std::map<Task*, Own<Task>> tasks;
};
namespace _ { // private
class LoggingErrorHandler: public TaskSet::ErrorHandler {
public:
......@@ -210,11 +210,11 @@ void EventPort::wake() const {
EventLoop::EventLoop()
: port(_::NullEventPort::instance),
daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::EventLoop(EventPort& port)
: port(port),
daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::~EventLoop() noexcept(false) {
// Destroy all "daemon" tasks, noting that their destructors might try to access the EventLoop
......@@ -524,19 +524,6 @@ kj::String Event::trace() {
// =======================================================================================
TaskSet::TaskSet(ErrorHandler& errorHandler)
: impl(heap<_::TaskSetImpl>(errorHandler)) {}
TaskSet::~TaskSet() noexcept(false) {}
void TaskSet::add(Promise<void>&& promise) {
impl->add(kj::mv(promise));
}
kj::String TaskSet::trace() {
return impl->trace();
}
namespace _ { // private
kj::String PromiseBase::trace() {
......
......@@ -317,7 +317,7 @@ private:
friend PromiseFulfillerPair<U> newPromiseAndFulfiller();
template <typename>
friend class _::ForkHub;
friend class _::TaskSetImpl;
friend class TaskSet;
friend Promise<void> _::yield();
friend class _::NeverDone;
template <typename U>
......@@ -522,8 +522,8 @@ public:
};
TaskSet(ErrorHandler& errorHandler);
// `loop` will be used to wait on promises. `errorHandler` will be executed any time a task
// throws an exception, and will execute within the given EventLoop.
// `errorHandler` will be executed any time a task throws an exception, and will execute within
// the given EventLoop.
~TaskSet() noexcept(false);
......@@ -533,7 +533,10 @@ public:
// Return debug info about all promises currently in the TaskSet.
private:
Own<_::TaskSetImpl> impl;
class Task;
TaskSet::ErrorHandler& errorHandler;
Maybe<Own<Task>> tasks;
};
// =======================================================================================
......@@ -653,7 +656,7 @@ private:
_::Event** tail = &head;
_::Event** depthFirstInsertPoint = &head;
Own<_::TaskSetImpl> daemons;
Own<TaskSet> daemons;
bool turn();
void setRunnable(bool runnable);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment