execution_queue.h 7.69 KB
Newer Older
gejun's avatar
gejun committed
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
// Copyright (c) 2015 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18 19 20 21 22

// Author: Zhangyi Chen (chenzhangyi01@baidu.com)
// Date: 2015/10/23 18:16:16


#include "bthread/bthread.h"
#include "butil/type_traits.h"
gejun's avatar
gejun committed
24 25 26 27 28 29 30 31 32 33

namespace bthread {

// ExecutionQueue is a special wait-free MPSC queue of which the consumer thread
// is auto started by the execute operation and auto quits if there are no more 
// tasks, in another word there isn't a daemon bthread waiting to consume tasks

template <typename T> struct ExecutionQueueId;
template <typename T> class ExecutionQueue;
struct TaskNode;
class ExecutionQueueBase;
gejun's avatar
gejun committed
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

class TaskIteratorBase {
friend class ExecutionQueueBase;
    // Returns true when the ExecutionQueue is stopped and there will never be
    // more tasks and you can safely release all the related resources ever 
    // after.
    bool is_queue_stopped() const { return _is_stopped; }
    operator bool() const;
    TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
                     bool is_stopped, bool high_priority)
        : _cur_node(head)
        , _head(head)
        , _q(queue)
        , _is_stopped(is_stopped)
        , _high_priority(high_priority)
        , _should_break(false)
        , _num_iterated(0)
    { operator++(); }
    void operator++();
    TaskNode* cur_node() const { return _cur_node; }
    int num_iterated() const { return _num_iterated; }
    bool should_break_for_high_priority_tasks();

    TaskNode*               _cur_node;
    TaskNode*               _head;
    ExecutionQueueBase*     _q;
    bool                    _is_stopped;
    bool                    _high_priority;
    bool                    _should_break;
    int                     _num_iterated;

// Iterate over the given tasks
// Examples:
// int demo_execute(void* meta, TaskIterator<T>& iter) {
//     if (iter.is_stopped()) {
//         // destroy meta and related resources
//         return 0;
//     }
//     for (; iter; ++iter) {
//         // do_something(*iter)
//         // or do_something(iter->a_member_of_T)
//     }
//     return 0;
// }
template <typename T>
class TaskIterator : public TaskIteratorBase {
    typedef T*          pointer;
    typedef T&          reference;

    reference operator*() const;
    pointer operator->() const { return &(operator*()); }
    TaskIterator& operator++();
    void operator++(int);

struct TaskHandle {
    TaskNode* node;
    int64_t version;

struct TaskOptions {
    TaskOptions(bool high_priority, bool in_place_if_possible);

    // Executor would execute high-priority tasks in the FIFO order but before 
    // all pending normal-priority tasks.
    // NOTE: We don't guarantee any kind of real-time as there might be tasks still
    // in process which are uninterruptible.
    // Default: false 
    bool high_priority;

    // If |in_place_if_possible| is true, execution_queue_execute would call 
    // execute immediately instead of starting a bthread if possible
    // Note: Running callbacks in place might cause the dead lock issue, you
    // should be very careful turning this flag on.
    // Default: false
    bool in_place_if_possible;

const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);

struct ExecutionQueueOptions {
    // Attribute of the bthread which execute runs on
    // default: BTHREAD_ATTR_NORMAL
    bthread_attr_t bthread_attr;

// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// the default options. 
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
template <typename T>
int execution_queue_start(
        ExecutionQueueId<T>* id, 
        const ExecutionQueueOptions* options,
        int (*execute)(void* meta, TaskIterator<T>& iter),
        void* meta);

// Stop the ExecutionQueue.
// After this function is called:
//  - All the following calls to execution_queue_execute would fail immediately.
//  - The executor will call |execute| with TaskIterator::is_queue_stopped() being 
//    true exactly once when all the pending tasks have been executed, and after
//    this point it's ok to release the resource referenced by |meta|.
// Returns 0 on success, errno othrwise
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);

// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);

// Thread-safe and Wait-free.
// Execute a task with defaut TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, 
                            typename butil::add_const_reference<T>::type task);
gejun's avatar
gejun committed
169 170 171 172 173 174 175 176

// Thread-safe and Wait-free.
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
// If |handle| is not NULL, we will assign it with the hanlder of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, 
                            typename butil::add_const_reference<T>::type task,
gejun's avatar
gejun committed
178 179 180
                            const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id, 
                            typename butil::add_const_reference<T>::type task,
gejun's avatar
gejun committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
                            const TaskOptions* options,
                            TaskHandle* handle);

// [Thread safe and ABA free] Cancel the corrosponding task.
// Returns:
//  -1: The task was executed or h is an invalid handle
//  0: Success
//  1: The task is executing 
int execution_queue_cancel(const TaskHandle& h);

// Thread-safe and Wait-free
// Address a reference of ExecutionQueue if |id| references to a valid 
// ExecutionQueue
// |execution_queue_execute| internally fetches a reference of ExecutionQueue at
// the begining and releases it at the end, which makes 2 additional cache
// updates. In some critical situation where the overhead of
// execution_queue_execute matters, you can avoid this by addressing the 
// reference at the begining of every producer, and execute tasks execatly 
// through the reference instead of id.
// Note: It makes |execution_queue_stop| a little complicated in the user level,
// as we don't pass the `stop task' to |execute| until no one holds any reference.
// If you are not sure about the ownership of the return value (which releasees
// the reference of the very ExecutionQueue in the destructor) and don't that
// care the overhead of ExecutionQueue, DON'T use this function
template <typename T>
typename ExecutionQueue<T>::scoped_ptr_t 
execution_queue_address(ExecutionQueueId<T> id);

}  // namespace bthread

#include "bthread/execution_queue_inl.h"