Commit bf69c94b authored by Kenton Varda's avatar Kenton Varda

Lockless EventLoop, but at what cost? This code got weird.

parent b2f663a0
......@@ -199,6 +199,7 @@ libkj_async_la_LIBADD = libkj.la $(PTHREAD_LIBS) -lpthread
libkj_async_la_LDFLAGS = -release $(VERSION) -no-undefined
libkj_async_la_SOURCES= \
src/kj/work-queue.h \
src/kj/work-queue.c++ \
src/kj/async.c++ \
src/kj/async-unix.c++ \
src/kj/async-io.c++
......
......@@ -48,7 +48,6 @@ AS_IF([test "$external_capnp" != "no"], [
])
AM_CONDITIONAL([USE_EXTERNAL_CAPNP], [test "$external_capnp" != "no"])
# Only used in a test. Cleanup?
AC_SEARCH_LIBS(sched_yield, rt)
LIBS="$PTHREAD_LIBS $LIBS"
......
......@@ -108,10 +108,16 @@ public:
inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
const _::WorkQueue<SignalJob>& signalQueue,
int signum)
: job(signalQueue.add(fulfiller, signum)) {}
: job(signalQueue.createJob(fulfiller, signum)) {
job->addToQueue();
}
~SignalPromiseAdapter() noexcept(false) {
job->cancel();
}
private:
Own<const SignalJob> job;
Own<_::WorkQueue<SignalJob>::JobWrapper> job;
};
class UnixEventLoop::PollJob {
......@@ -142,10 +148,16 @@ public:
inline PollPromiseAdapter(PromiseFulfiller<short>& fulfiller,
const _::WorkQueue<PollJob>& pollQueue,
int fd, short eventMask)
: job(pollQueue.add(fulfiller, fd, eventMask)) {}
: job(pollQueue.createJob(fulfiller, fd, eventMask)) {
job->addToQueue();
}
~PollPromiseAdapter() noexcept(false) {
job->cancel();
}
private:
Own<const PollJob> job;
Own<_::WorkQueue<PollJob>::JobWrapper> job;
};
UnixEventLoop::UnixEventLoop(): impl(heap<Impl>(*this)) {
......
......@@ -79,14 +79,7 @@ bool EventLoop::isCurrent() const {
return threadLocalEventLoop == this;
}
void EventLoop::EventListHead::fire() {
KJ_FAIL_ASSERT("Fired event list head.");
}
EventLoop::EventLoop(): queue(*this), insertPoint(&queue) {
queue.next = &queue;
queue.prev = &queue;
}
EventLoop::EventLoop() {}
void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result) {
EventLoop* oldEventLoop = threadLocalEventLoop;
......@@ -97,36 +90,22 @@ void EventLoop::waitImpl(Own<_::PromiseNode> node, _::ExceptionOrValue& result)
event.fired = node->onReady(event);
while (!event.fired) {
queue.mutex.lock(_::Mutex::EXCLUSIVE);
KJ_IF_MAYBE(event, queue.peek(nullptr)) {
// Arrange for events armed during the event callback to be inserted at the beginning
// of the queue.
insertionPoint = nullptr;
// Get the first event in the queue.
Event* event = queue.next;
if (event == &queue) {
// No events in the queue.
// Fire the first event.
event->complete(0);
} else {
// No events in the queue. Wait for callback.
prepareToSleep();
queue.mutex.unlock(_::Mutex::EXCLUSIVE);
if (queue.peek(*this) != nullptr) {
// Whoa, new job was just added.
wake();
}
sleep();
continue;
}
// Remove it from the queue.
queue.next = event->next;
event->next->prev = &queue;
event->next = nullptr;
event->prev = nullptr;
// New events should be inserted at the beginning of the queue, but in order.
insertPoint = queue.next;
// Lock it before we unlock the queue mutex.
event->mutex.lock(_::Mutex::EXCLUSIVE);
// Now we can unlock the queue.
queue.mutex.unlock(_::Mutex::EXCLUSIVE);
// Fire the event, making sure we unlock the mutex afterwards.
KJ_DEFER(event->mutex.unlock(_::Mutex::EXCLUSIVE));
event->fire();
}
node->get(result);
......@@ -136,72 +115,35 @@ Promise<void> EventLoop::yieldIfSameThread() const {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
EventLoop::Event::~Event() noexcept(false) {
if (this != &loop.queue) {
KJ_ASSERT(next == this,
"Event destroyed while armed. You must call disarm() in the subclass's destructor "
"in order to ensure that fire() is not running when the event is destroyed.") {
break;
}
}
void EventLoop::receivedNewJob() const {
wake();
}
void EventLoop::Event::arm(bool preemptIfSameThread) {
loop.queue.mutex.lock(_::Mutex::EXCLUSIVE);
KJ_DEFER(loop.queue.mutex.unlock(_::Mutex::EXCLUSIVE));
if (next == nullptr) {
bool queueIsEmpty = loop.queue.next == &loop.queue;
if (preemptIfSameThread && threadLocalEventLoop == &loop) {
// Insert the event into the queue. We put it at the front rather than the back so that
// related events are executed together and so that increasing the granularity of events
// does not cause your code to "lose priority" compared to simultaneously-running code
// with less granularity.
next = loop.insertPoint;
prev = next->prev;
next->prev = this;
prev->next = this;
} else {
// Insert the node at the *end* of the queue.
prev = loop.queue.prev;
next = prev->next;
prev->next = this;
next->prev = this;
if (loop.insertPoint == &loop.queue) {
loop.insertPoint = this;
}
}
EventLoop::Event::Event(const EventLoop& loop)
: loop(loop),
jobs { loop.queue.createJob(*this), loop.queue.createJob(*this) } {}
if (queueIsEmpty) {
// Queue was empty previously. Make sure to wake it up if it is sleeping.
loop.wake();
}
EventLoop::Event::~Event() noexcept(false) {}
void EventLoop::Event::arm(bool preemptIfSameThread) {
EventLoop* localLoop = threadLocalEventLoop;
if (preemptIfSameThread && localLoop == &loop) {
// Insert the event into the queue. We put it at the front rather than the back so that
// related events are executed together and so that increasing the granularity of events
// does not cause your code to "lose priority" compared to simultaneously-running code
// with less granularity.
jobs[currentJob]->insertAfter(localLoop->insertionPoint, localLoop->queue);
localLoop->insertionPoint = *jobs[currentJob];
} else {
// Insert the node at the *end* of the queue.
jobs[currentJob]->addToQueue();
}
currentJob = !currentJob;
}
void EventLoop::Event::disarm() {
loop.queue.mutex.lock(_::Mutex::EXCLUSIVE);
if (next != nullptr && next != this) {
if (loop.insertPoint == this) {
loop.insertPoint = next;
}
next->prev = prev;
prev->next = next;
next = nullptr;
prev = nullptr;
}
next = this;
loop.queue.mutex.unlock(_::Mutex::EXCLUSIVE);
// Ensure that if fire() is currently running, it completes before disarm() returns.
mutex.lock(_::Mutex::EXCLUSIVE);
mutex.unlock(_::Mutex::EXCLUSIVE);
jobs[0]->cancel();
jobs[1]->cancel();
}
// =======================================================================================
......
......@@ -27,6 +27,7 @@
#include "exception.h"
#include "mutex.h"
#include "refcount.h"
#include "work-queue.h"
namespace kj {
......@@ -198,7 +199,7 @@ template <typename Func, typename T>
using PromiseForResultNoChaining = Promise<_::DisallowChain<_::ReturnType<Func, T>>>;
// Like PromiseForResult but chaining (continuations that return another promise) is now allowed.
class EventLoop {
class EventLoop: private _::NewJobCallback {
// Represents a queue of events being executed in a loop. Most code won't interact with
// EventLoop directly, but instead use `Promise`s to interact with it indirectly. See the
// documentation for `Promise`.
......@@ -222,6 +223,8 @@ class EventLoop {
// return 0;
// }
class EventJob;
public:
EventLoop();
......@@ -301,13 +304,13 @@ public:
// conditions.
public:
Event(const EventLoop& loop): loop(loop), next(nullptr), prev(nullptr) {}
Event(const EventLoop& loop);
~Event() noexcept(false);
KJ_DISALLOW_COPY(Event);
void arm(bool preemptIfSameThread = true);
// Enqueue this event so that run() will be called from the event loop soon. Does nothing
// if the event is already armed.
// Enqueue this event so that run() will be called from the event loop soon. It is an error
// to call this when the event is already armed.
//
// If called from the event loop's own thread (i.e. from within an event handler fired from
// this event loop), and `preemptIfSameThread` is true, the event will be scheduled
......@@ -336,13 +339,11 @@ public:
private:
friend class EventLoop;
const EventLoop& loop;
Event* next; // if == this, disarm() has been called.
Event* prev;
mutable kj::_::Mutex mutex;
// Hack: The mutex on the list head is treated as protecting the next/prev links across the
// whole list. The mutex on each Event other than the head is treated as protecting that
// event's armed/disarmed state.
// TODO(cleanup): This is dumb. We allocate two jobs and alternate between them so that an
// event's fire() can re-arm itself without deadlocking or causing other trouble.
Own<_::WorkQueue<EventJob>::JobWrapper> jobs[2];
uint currentJob = 0;
};
protected:
......@@ -365,21 +366,21 @@ protected:
// is armed; it should return quickly if the loop isn't prepared to sleep.
private:
class EventListHead: public Event {
class EventJob {
public:
inline EventListHead(EventLoop& loop): Event(loop) {}
void fire() override; // throws
EventJob(Event& event): event(event) {}
inline void complete(int dummyArg) { event.fire(); }
inline void cancel() {}
private:
Event& event;
};
mutable EventListHead queue;
// Head of the event list. queue.mutex protects all next/prev pointers across the list, as well
// as `insertPoint`. Each actual event's mutex protects its own `fire()` callback.
_::WorkQueue<EventJob> queue;
mutable Event* insertPoint;
// The next event after the one that is currently firing. New events are inserted just before
// this event. When the fire callback completes, the loop continues at the beginning of the
// queue -- thus, it starts by running any events that were just enqueued by the previous
// callback. This keeps related events together.
Maybe<_::WorkQueue<EventJob>::JobWrapper&> insertionPoint;
// Where to insert preemptively-scheduled events into the queue.
template <typename T, typename Func, typename ErrorFunc>
Own<_::PromiseNode> thereImpl(Promise<T>&& promise, Func&& func, ErrorFunc&& errorHandler) const;
......@@ -393,6 +394,9 @@ private:
// currently on the queue are fired. Otherwise, returns an already-resolved promise. Used to
// implement evalLater().
void receivedNewJob() const override;
// Implements NewJobCallback.
template <typename>
friend class Promise;
};
......@@ -1272,9 +1276,11 @@ public:
}
private:
Adapter adapter;
ExceptionOr<T> result;
bool waiting = true;
Adapter adapter;
// `adapter` must come last so that it is destroyed first, since fulfill() could be called up
// until that point.
void fulfill(T&& value) override {
if (waiting) {
......
......@@ -149,28 +149,68 @@ void Once::runOnce(Initializer& init) {
}
} else {
for (;;) {
if (state == INITIALIZED) {
if (state == INITIALIZED || state == DISABLED) {
break;
} else if (state == INITIALIZING) {
// Initialization is taking place in another thread. Indicate that we're waiting.
if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true,
__ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// State changed, retry.
continue;
}
} else {
KJ_DASSERT(state == INITIALIZING_WITH_WAITERS);
}
// Wait for initialization.
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS, NULL, NULL, 0);
state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
}
}
}
void Once::reset() {
uint state = INITIALIZED;
if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED,
false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
KJ_REQUIRE(state == DISABLED, "reset() called while not initialized.");
}
}
void Once::disable() noexcept {
uint state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
for (;;) {
switch (state) {
case DISABLED:
default:
return;
case UNINITIALIZED:
case INITIALIZED:
// Try to transition the state to DISABLED.
if (!__atomic_compare_exchange_n(&futex, &state, DISABLED, true,
__ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
// State changed, retry.
continue;
}
// Success.
return;
case INITIALIZING:
// Initialization is taking place in another thread. Indicate that we're waiting.
if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true,
__ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) {
// State changed, retry.
continue;
}
// no break
// The docs for __atomic_compare_exchange_n claim that the memmodel for the failure case cannot
// be stronger than the success case. That's disappointing, because what we really want is
// for the two cmpxchg calls above to do an acquire barrier in the failure case only, while
// being relaxed if successful, so that once the state is INITIALIZED we know we've acquired
// it. Oh well, we'll just do an acquire barrier on the way out instead.
KJ_ASSERT(__atomic_load_n(&futex, __ATOMIC_ACQUIRE) == INITIALIZED);
case INITIALIZING_WITH_WAITERS:
// Wait for initialization.
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS, NULL, NULL, 0);
state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE);
break;
}
}
}
......@@ -236,7 +276,7 @@ void Mutex::assertLockedByCaller(Exclusivity exclusivity) {
}
}
Once::Once(): initialized(false) {
Once::Once(bool startInitialized): state(startInitialized ? INITIALIZED : UNINITIALIZED) {
KJ_PTHREAD_CALL(pthread_mutex_init(&mutex, nullptr));
}
Once::~Once() {
......@@ -247,13 +287,28 @@ void Once::runOnce(Initializer& init) {
KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex));
KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex)));
if (initialized) {
if (state != UNINITIALIZED) {
return;
}
init.run();
__atomic_store_n(&initialized, true, __ATOMIC_RELEASE);
__atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE);
}
void Once::reset() {
State oldState = INITIALIZED;
if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED,
false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
KJ_REQUIRE(oldState == DISABLED, "reset() called while not initialized.");
}
}
void Once::disable() noexcept {
KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex));
KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex)));
__atomic_store_n(&state, DISABLED, __ATOMIC_RELAXED);
}
#endif
......
......@@ -26,7 +26,7 @@
#include "memory.h"
#if __linux__ && !defined(KJ_FUTEX)
#if __linux__ && !defined(KJ_USE_FUTEX)
#define KJ_USE_FUTEX 1
#endif
......@@ -87,9 +87,10 @@ class Once {
public:
#if KJ_USE_FUTEX
inline Once(): futex(UNINITIALIZED) {}
inline Once(bool startInitialized = false)
: futex(startInitialized ? INITIALIZED : UNINITIALIZED) {}
#else
Once();
Once(bool startInitialized = false);
~Once();
#endif
KJ_DISALLOW_COPY(Once);
......@@ -106,7 +107,26 @@ public:
#if KJ_USE_FUTEX
return __atomic_load_n(&futex, __ATOMIC_ACQUIRE) == INITIALIZED;
#else
return __atomic_load_n(&initialized, __ATOMIC_ACQUIRE);
return __atomic_load_n(&state, __ATOMIC_ACQUIRE) == INITIALIZED;
#endif
}
void reset();
// Returns the state from initialized to uninitialized. It is an error to call this when
// not already initialized, or when runOnce() or isInitialized() might be called concurrently in
// another thread.
void disable() noexcept;
// Prevent future calls to runOnce() and reset() from having any effect, and make isInitialized()
// return false forever. If an initializer is currently running, block until it completes.
bool isDisabled() noexcept {
// Returns true if `disable()` has been called.
#if KJ_USE_FUTEX
return __atomic_load_n(&futex, __ATOMIC_ACQUIRE) == DISABLED;
#else
return __atomic_load_n(&state, __ATOMIC_ACQUIRE) == DISABLED;
#endif
}
......@@ -114,13 +134,21 @@ private:
#if KJ_USE_FUTEX
uint futex;
static constexpr uint UNINITIALIZED = 0;
static constexpr uint INITIALIZING = 1;
static constexpr uint INITIALIZING_WITH_WAITERS = 2;
static constexpr uint INITIALIZED = 3;
enum State {
UNINITIALIZED,
INITIALIZING,
INITIALIZING_WITH_WAITERS,
INITIALIZED,
DISABLED
};
#else
bool initialized;
enum State {
UNINITIALIZED,
INITIALIZED,
DISABLED
};
State state;
pthread_mutex_t mutex;
#endif
};
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "work-queue.h"
#include <sched.h>
namespace kj {
namespace _ { // private
void yieldThread() {
sched_yield();
}
} // namespace _ (private)
} // namespace kj
......@@ -30,6 +30,8 @@
namespace kj {
namespace _ { // private
void yieldThread();
class NewJobCallback {
public:
virtual void receivedNewJob() const = 0;
......@@ -46,8 +48,13 @@ class WorkQueue {
// before complete() gets a chance to be called. Often, cancel() does nothing. Either
// complete() or cancel() will have been called and returned before Own<Job>'s destructor
// finishes; the destructor will block if complete() is still running in another thread.
//
// This class is extraordinarily difficult to use correctly and therefore remains private for
// now.
public:
~WorkQueue() noexcept(false);
class JobWrapper final: private Disposer {
// Holds one Job in the queue.
......@@ -55,8 +62,10 @@ public:
const Job& get() const;
// Get the wrapped Job.
// methods called by the consumer ------------------------------------------
Maybe<JobWrapper&> getNext();
// Get the next job in the queue. Returns nullptr if there are no more jobs. Removes any
// Gets the next job in the queue. Returns nullptr if there are no more jobs. Removes any
// jobs in the list that are already canceled.
template <typename Param>
......@@ -69,10 +78,36 @@ public:
// params. However, GCC 4.7 and 4.8 appear to get confused by param packs used inside
// lambdas. Clang handles it fine.
// methods called by the producer ------------------------------------------
void addToQueue();
// Add the job to the queue.
//
// It is an error to call this if the job is already in the queue. However, once complete()
// has been called on the underlying `Job`, `addToQueue()` can be called again (including
// recursively during `complete()`. On the other hand, it is *not* safe to call `addToQueue()`
// after `cancel()`.
void cancel();
// Cancels the job before completion, preventing complete() from being called. If complete()
// is already being called, blocks until it finishes. Causes subsequent calls to `addToQueue()`
// to have no effect.
// methods called by the single producer/consumer --------------------------
void insertAfter(Maybe<JobWrapper&> position, WorkQueue& queue);
// Like `addToQueue()`, but insert the job into the queue after `position` or the beginning of
// the queue if `position` is null. This can only be used if the calling thread is also the
// consuming thread.
//
// The non-const WorkQueue must be provided to allow thread-unsafe access.
private:
const WorkQueue& queue;
Job job;
mutable _::Once once;
mutable uint refcount = 2;
mutable uint refcount = 1;
JobWrapper* next = nullptr;
// The JobWrapper cannot be destroyed until this pointer becomes non-null.
......@@ -83,7 +118,7 @@ public:
friend class WorkQueue;
template <typename... Params>
JobWrapper(Params&&... params);
JobWrapper(const WorkQueue& queue, Params&&... params);
void disposeImpl(void* pointer) const override;
// Called when the listener discards its Own<Job>.
......@@ -94,6 +129,11 @@ public:
//
// `drop()` is also responsible for calling the job's destructor.
typename WorkQueue<Job>::JobWrapper* unlink();
// Safely remove the job from the queue. Only the consumer can call this. Does not call
// removeRef(); the caller must arrange to call this afterwards. Returns the following item
// in the queue, if any.
void removeRef() const;
template <typename Func>
......@@ -118,9 +158,9 @@ public:
// you are sure that you don't need to be notified, pass null for the callback.
template <typename... Params>
Own<const Job> add(Params&&... params) const;
// Adds an job to the list, passing the given parameters to its constructor. Can be called
// from any thread.
Own<JobWrapper> createJob(Params&&... params) const;
// Creates a new job, passing the given parameters to the `Job` constructor, but does not yet
// add the job to the queue. Call addToQueue() on the `JobWrapper` to do so.
//
// If the caller discards the returned pointer before the job completes, it will be canceled.
// Either the job's complete() or cancel() method will be called exactly once and will have
......@@ -141,8 +181,21 @@ private:
mutable NewJobCallback* newJobCallback = nullptr;
// If non-null, run this the next time `add()` is called, then set null.
void notifyNewJob() const;
// Atomically call the new job callback (if non-null) and then set it to null.
};
template <typename Job>
WorkQueue<Job>::~WorkQueue() noexcept(false) {
JobWrapper* ptr = head;
while (ptr != nullptr) {
JobWrapper* next = ptr->next;
ptr->removeRef();
ptr = next;
}
}
template <typename Job>
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::peek(
kj::Maybe<NewJobCallback&> callback) {
......@@ -162,27 +215,22 @@ Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::peek(
template <typename Job>
template <typename... Params>
Own<const Job> WorkQueue<Job>::add(Params&&... params) const {
JobWrapper* job = new JobWrapper(kj::fwd<Params>(params)...);
Own<const Job> result(&job->job, *job);
job->prev = __atomic_load_n(&tail, __ATOMIC_ACQUIRE);
while (!__atomic_compare_exchange_n(&tail, &job->prev, &job->next, true,
__ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) {
// Oops, someone else updated the tail pointer. Try again.
}
__atomic_store_n(job->prev, job, __ATOMIC_RELEASE);
Own<typename WorkQueue<Job>::JobWrapper> WorkQueue<Job>::createJob(Params&&... params) const {
JobWrapper* job = new JobWrapper(*this, kj::fwd<Params>(params)...);
return Own<JobWrapper>(job, *job);
}
template <typename Job>
void WorkQueue<Job>::notifyNewJob() const {
// Check if there is a NewJobCallback.
if (__atomic_load_n(&newJobCallback, __ATOMIC_RELAXED) != nullptr) {
// There is a callback. Do an atomic exchange to acquire it.
// There is a callback. Atomically acquire it so that it is only caleld once.
NewJobCallback* callback = __atomic_exchange_n(&newJobCallback, nullptr, __ATOMIC_ACQUIRE);
if (callback != nullptr) {
// Acquired by this thread. Call it.
callback->receivedNewJob();
}
}
return kj::mv(result);
}
template <typename Job>
......@@ -203,34 +251,63 @@ Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::getNext(
template <typename Job>
template <typename Param>
Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::complete(Param&& param) {
doOnce([&]() {
const_cast<Job&>(job).complete(kj::fwd<Param>(param));
});
// Remove from the queue first, so that it can be re-added during the callback.
JobWrapper* result = unlink();
// Run the callback.
{
KJ_DEFER(removeRef());
doOnce([&]() {
const_cast<Job&>(job).complete(kj::fwd<Param>(param));
});
}
JobWrapper* nextCopy = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
if (nextCopy == nullptr) {
// Return the next job.
if (result == nullptr) {
return nullptr;
} else {
return result->skipDead();
}
*prev = nextCopy;
nextCopy->prev = prev;
removeRef();
return nextCopy->skipDead();
}
template <typename Job>
template <typename... Params>
inline WorkQueue<Job>::JobWrapper::JobWrapper(Params&&... params)
: job(kj::fwd<Params>(params)...) {}
inline WorkQueue<Job>::JobWrapper::JobWrapper(const WorkQueue& queue, Params&&... params)
: queue(queue), job(kj::fwd<Params>(params)...), once(true) {}
template <typename Job>
void WorkQueue<Job>::JobWrapper::disposeImpl(void* pointer) const {
if (!once.isInitialized()) {
doOnce([this]() { const_cast<Job&>(job).cancel(); });
}
KJ_IREQUIRE(once.isDisabled());
removeRef();
}
template <typename Job>
typename WorkQueue<Job>::JobWrapper* WorkQueue<Job>::JobWrapper::unlink() {
JobWrapper* result;
result = *prev = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
if (result == nullptr) {
// Oops, we're at the end of the queue. We need to back up the tail pointer.
JobWrapper** oldTail = &next;
if (!__atomic_compare_exchange_n(&queue.tail, &oldTail, prev, false,
__ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
// Crap, another thread is busy inserting. Gotta wait for it.
do {
yieldThread();
result = *prev = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
} while (result == nullptr);
result->prev = prev;
}
} else {
result->prev = prev;
}
next = nullptr;
prev = nullptr;
return result;
}
template <typename Job>
void WorkQueue<Job>::JobWrapper::removeRef() const {
if (__atomic_sub_fetch(&refcount, 1, __ATOMIC_ACQ_REL) == 0) {
......@@ -262,19 +339,85 @@ Maybe<typename WorkQueue<Job>::JobWrapper&> WorkQueue<Job>::JobWrapper::skipDead
while (current->once.isInitialized()) {
// This job has already been taken care of. Remove it.
JobWrapper* next = __atomic_load_n(&current->next, __ATOMIC_ACQUIRE);
if (next == nullptr) {
current = current->unlink();
if (current == nullptr) {
return nullptr;
}
*current->prev = next;
next->prev = current->prev;
current->removeRef();
current = next;
}
return *current;
}
template <typename Job>
void WorkQueue<Job>::JobWrapper::addToQueue() {
KJ_IREQUIRE(prev == nullptr, "Job is already in the queue.");
once.reset();
// Increment refcount to indicate insertion into the queue.
__atomic_add_fetch(&refcount, 1, __ATOMIC_RELAXED);
// Atomically set our `prev` to the tail pointer, and the tail pointer point to our `next`.
prev = __atomic_load_n(&queue.tail, __ATOMIC_ACQUIRE);
while (!__atomic_compare_exchange_n(&queue.tail, &prev, &next, true,
__ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) {
// Oops, someone else updated the tail pointer. Try again.
}
// Update the target of the prev pointer to point back to us. This inserts the job into the
// list; the consumer thread may now consume it.
__atomic_store_n(prev, this, __ATOMIC_RELEASE);
queue.notifyNewJob();
}
template <typename Job>
void WorkQueue<Job>::JobWrapper::cancel() {
once.disable();
}
template <typename Job>
void WorkQueue<Job>::JobWrapper::insertAfter(Maybe<JobWrapper&> position, WorkQueue& queue) {
KJ_IREQUIRE(prev == nullptr, "Job is already in the queue.");
once.reset();
// Increment refcount to indicate insertion into the queue.
__atomic_add_fetch(&refcount, 1, __ATOMIC_RELAXED);
KJ_IF_MAYBE(p, position) {
KJ_IREQUIRE(p->prev != nullptr, "position is not in the queue");
prev = &p->next;
} else {
prev = &queue.head;
}
next = __atomic_load_n(prev, __ATOMIC_ACQUIRE);
if (next == nullptr) {
// It appears we're trying to insert at the end of the queue, so we could interfere with other
// threads. We need to update the tail pointer to claim our spot.
if (!__atomic_compare_exchange_n(&queue.tail, &prev, &next, false,
__ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) {
// Oh, someone else claimed the spot. Our job needs to be inserted ahead of theirs so
// we'll need to spin until they finish linking their job into the list.
do {
yieldThread();
next = __atomic_load_n(prev, __ATOMIC_ACQUIRE);
} while (next == nullptr);
next->prev = &next;
}
} else {
next->prev = &next;
}
// No more thread interference.
*prev = this;
queue.notifyNewJob();
}
} // namespace _ (private)
} // namespace kj
......
linux-gcc-4.7 1722 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1725 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1745 ./super-test.sh tmpdir capnp-clang quick clang
linux-gcc-4.7 1731 ./super-test.sh tmpdir capnp-gcc-4.7 quick
linux-gcc-4.8 1734 ./super-test.sh tmpdir capnp-gcc-4.8 quick gcc-4.8
linux-clang 1754 ./super-test.sh tmpdir capnp-clang quick clang
mac 802 ./super-test.sh remote beat caffeinate quick
cygwin 805 ./super-test.sh remote Kenton@flashman quick
cygwin 810 ./super-test.sh remote Kenton@flashman quick
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment