Commit b2f663a0 authored by Kenton Varda's avatar Kenton Varda

Fix races in WorkQueue, and simplify usage.

parent 7a142713
......@@ -200,11 +200,6 @@ void UnixEventLoop::sleep() {
return;
}
// Make sure we don't wait for any events that are no longer relevant, either because we fired
// them last time around or because the application has discarded the corresponding promises.
impl->signalQueue.cleanup();
impl->pollQueue.cleanup();
sigset_t newMask;
sigemptyset(&newMask);
sigaddset(&newMask, SIGUSR1);
......@@ -222,7 +217,7 @@ void UnixEventLoop::sleep() {
}
kj::Vector<struct pollfd> pollfds;
kj::Vector<const _::WorkQueue<PollJob>::JobWrapper*> pollJobs;
kj::Vector<_::WorkQueue<PollJob>::JobWrapper*> pollJobs;
{
auto job = impl->pollQueue.peek(*impl);
......
......@@ -55,13 +55,15 @@ public:
const Job& get() const;
// Get the wrapped Job.
Maybe<const JobWrapper&> getNext() const;
// Get the next job in the queue. Returns nullptr if there are no more jobs.
Maybe<JobWrapper&> getNext();
// Get the next job in the queue. Returns nullptr if there are no more jobs. Removes any
// jobs in the list that are already canceled.
template <typename Param>
void complete(Param&& param) const;
Maybe<JobWrapper&> complete(Param&& param);
// If the listener has not yet dropped its pointer to the Job, invokes the job's `complete()`
// method with the given parameters.
// method with the given parameters and then removes it from the list. Returns the next job
// in the list.
//
// TODO(someday): This ought to be a variadic template, letting you pass any number of
// params. However, GCC 4.7 and 4.8 appear to get confused by param packs used inside
......@@ -75,6 +77,9 @@ public:
JobWrapper* next = nullptr;
// The JobWrapper cannot be destroyed until this pointer becomes non-null.
JobWrapper** prev = nullptr;
// Pointer to the pointer that points to this.
friend class WorkQueue;
template <typename... Params>
......@@ -98,12 +103,12 @@ public:
// TODO(cleanup): Perhaps there should be an easier way to do this, without the extra
// overhead of Lazy<T>.
Maybe<const JobWrapper&> skipDead() const;
Maybe<JobWrapper&> skipDead();
// Find the first JobWrapper in the list (starting from this one) which hasn't already
// run.
// run. Remove any that are already-run.
};
Maybe<const JobWrapper&> peek(kj::Maybe<NewJobCallback&> callback);
Maybe<JobWrapper&> peek(kj::Maybe<NewJobCallback&> callback);
// Get the first job in the list, or null if the list is empty. Must only be called in the
// thread which is receiving the work.
//
......@@ -112,10 +117,6 @@ public:
// alarms are possible; you must actually peek() again to find out if there is new work. If
// you are sure that you don't need to be notified, pass null for the callback.
void cleanup(uint count = maxValue);
// Goes through the list (up to a max of `count` jobs) and removes any that are no longer
// relevant. Must only be called in the thread receiving the work.
template <typename... Params>
Own<const Job> add(Params&&... params) const;
// Adds an job to the list, passing the given parameters to its constructor. Can be called
......@@ -130,18 +131,20 @@ private:
// Pointer to the first job.
mutable JobWrapper** tail = &head;
// Usually points to the last `next` pointer in the chain (which should be null, since it's the
// last one). However, because `tail` cannot be atomically updated at the same time that `*tail`
// becomes non-null, `tail` may be behind by a step or two. In fact, we are sloppy about
// updating this pointer, so there's a chance it will remain behind indefinitely (if two threads
// adding jobs at the same time race to update `tail`), but it should not be too far behind.
// Points to the `next` pointer of the last-added Job. When adding a new Job, `tail` is
// atomically updated in order to create a well-defined list ordering. Only after successfully
// atomically updating `tail` to point at the new job's `next` can the old `tail` location be
// updated to point at the new job.
//
// The consumer does not pay any attention to `tail`. It only follows `next` pointer until it
// finds one which is null.
mutable NewJobCallback* newJobCallback = nullptr;
// If non-null, run this the next time `add()` is called, then set null.
};
template <typename Job>
Maybe<const typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::peek(
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::peek(
kj::Maybe<NewJobCallback&> callback) {
KJ_IF_MAYBE(c, callback) {
__atomic_store_n(&newJobCallback, c, __ATOMIC_RELEASE);
......@@ -157,71 +160,27 @@ Maybe<const typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::peek(
}
}
template <typename Job>
void WorkQueue<Job>::cleanup(uint count) {
JobWrapper** ptr = &head;
while (count-- > 0) {
JobWrapper* job = __atomic_load_n(ptr, __ATOMIC_ACQUIRE);
if (job == nullptr) {
return;
} else if (job->once.isInitialized()) {
// This job is no longer useful to us.
JobWrapper* next = __atomic_load_n(&job->next, __ATOMIC_ACQUIRE);
if (next == nullptr) {
// We can't delete this job yet because another thread may be modifying its `next`
// pointer. We're at the end of the list, so we might as well return.
return;
} else {
// Unlink this job from the list.
// Note that since *ptr was already determined to be non-null, it now belongs to this
// thread, therefore we don't need to modify it atomically.
*ptr = next;
// If the tail pointer points to this job's next pointer (because a race in add() left
// it behind), be sure to get it updated. (This is unusual.)
if (__atomic_load_n(&tail, __ATOMIC_RELAXED) == &job->next) {
__atomic_store_n(&tail, &next->next, __ATOMIC_RELAXED);
}
// Now we can remove our ref to the job. This may delete it.
job->removeRef();
}
} else {
// Move on to the next job.
ptr = &job->next;
}
}
}
template <typename Job>
template <typename... Params>
Own<const Job> WorkQueue<Job>::add(Params&&... params) const {
JobWrapper* job = new JobWrapper(kj::fwd<Params>(params)...);
Own<const Job> result(&job->job, *job);
JobWrapper** tailCopy = __atomic_load_n(&tail, __ATOMIC_ACQUIRE);
JobWrapper* expected = nullptr;
while (!__atomic_compare_exchange_n(tailCopy, &expected, job, false,
job->prev = __atomic_load_n(&tail, __ATOMIC_ACQUIRE);
while (!__atomic_compare_exchange_n(&tail, &job->prev, &job->next, true,
__ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) {
// Oops, the tail pointer was out-of-date. Follow it looking for the real tail.
tailCopy = &expected->next;
expected = nullptr;
// Oops, someone else updated the tail pointer. Try again.
}
// Update tail to point at the end. Note that this is sloppy: it's possible that another
// thread has added another job concurrently and we're now moving the tail pointer backwards,
// but that's OK because we'll correct for it being behind the next time we use it.
__atomic_store_n(&tail, &job->next, __ATOMIC_RELEASE);
__atomic_store_n(job->prev, job, __ATOMIC_RELEASE);
if (NewJobCallback* callback = __atomic_load_n(&newJobCallback, __ATOMIC_ACQUIRE)) {
__atomic_store_n(&newJobCallback, nullptr, __ATOMIC_RELAXED);
if (__atomic_load_n(&newJobCallback, __ATOMIC_RELAXED) != nullptr) {
// There is a callback. Do an atomic exchange to acquire it.
NewJobCallback* callback = __atomic_exchange_n(&newJobCallback, nullptr, __ATOMIC_ACQUIRE);
if (callback != nullptr) {
callback->receivedNewJob();
}
}
return kj::mv(result);
}
......@@ -232,7 +191,7 @@ inline const Job& WorkQueue<Job>::JobWrapper::get() const {
}
template <typename Job>
Maybe<const typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::getNext() const {
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::getNext() {
JobWrapper* nextCopy = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
if (nextCopy == nullptr) {
return nullptr;
......@@ -243,10 +202,20 @@ Maybe<const typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::ge
template <typename Job>
template <typename Param>
void WorkQueue<Job>::JobWrapper::complete(Param&& param) const {
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::complete(Param&& param) {
doOnce([&]() {
const_cast<Job&>(job).complete(kj::fwd<Param>(param));
});
JobWrapper* nextCopy = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
if (nextCopy == nullptr) {
return nullptr;
}
*prev = nextCopy;
nextCopy->prev = prev;
removeRef();
return nextCopy->skipDead();
}
template <typename Job>
......@@ -288,14 +257,19 @@ void WorkQueue<Job>::JobWrapper::doOnce(Func&& func) const {
}
template <typename Job>
Maybe<const typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::skipDead() const {
const JobWrapper* current = this;
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::skipDead() {
JobWrapper* current = this;
while (current->once.isInitialized()) {
current = __atomic_load_n(&current->next, __ATOMIC_ACQUIRE);
if (current == nullptr) {
// This job has already been taken care of. Remove it.
JobWrapper* next = __atomic_load_n(&current->next, __ATOMIC_ACQUIRE);
if (next == nullptr) {
return nullptr;
}
*current->prev = next;
next->prev = current->prev;
current->removeRef();
current = next;
}
return *current;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment