task_group.h 9.06 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2012 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

gejun's avatar
gejun committed
19 20
#ifndef BTHREAD_TASK_GROUP_H
#define BTHREAD_TASK_GROUP_H
gejun's avatar
gejun committed
21

22
#include "butil/time.h"                             // cpuwide_time_ns
23
#include "bthread/task_control.h"
gejun's avatar
gejun committed
24 25
#include "bthread/task_meta.h"                     // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h"           // WorkStealingQueue
26
#include "bthread/remote_task_queue.h"             // RemoteTaskQueue
27
#include "butil/resource_pool.h"                    // ResourceId
28
#include "bthread/parking_lot.h"
gejun's avatar
gejun committed
29 30 31

namespace bthread {

32
// For exiting a bthread.
gejun's avatar
gejun committed
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
class ExitException : public std::exception {
public:
    explicit ExitException(void* value) : _value(value) {}
    ~ExitException() throw() {}
    const char* what() const throw() {
        return "ExitException";
    }
    void* value() const {
        return _value;
    }
private:
    void* _value;
};

// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
public:
    // Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
    // the identifier into `tid'. Switch to the new task and schedule old task
    // to run.
    // Return 0 on success, errno otherwise.
    static int start_foreground(TaskGroup** pg,
                                bthread_t* __restrict tid,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg);

    // Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
    // identifier into `tid'. Schedule the new thread to run.
65 66
    //   Called from worker: start_background<false>
    //   Called from non-worker: start_background<true>
gejun's avatar
gejun committed
67
    // Return 0 on success, errno otherwise.
68
    template <bool REMOTE>
gejun's avatar
gejun committed
69 70 71 72 73 74 75 76 77 78
    int start_background(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg);

    // Suspend caller and run next bthread in TaskGroup *pg.
    static void sched(TaskGroup** pg);
    static void ending_sched(TaskGroup** pg);

    // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
79 80
    // Purpose of this function is to avoid pushing `next_tid' to _rq and
    // then being popped by sched(pg), which is not necessary.
gejun's avatar
gejun committed
81
    static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
82 83 84 85 86 87
    static void sched_to(TaskGroup** pg, bthread_t next_tid);
    static void exchange(TaskGroup** pg, bthread_t next_tid);

    // The callback will be run in the beginning of next-run bthread.
    // Can't be called by current bthread directly because it often needs
    // the target to be suspended already.
88 89
    typedef void (*RemainedFn)(void*);
    void set_remained(RemainedFn cb, void* arg) {
90 91 92
        _last_context_remained = cb;
        _last_context_remained_arg = arg;
    }
gejun's avatar
gejun committed
93 94 95 96 97
    
    // Suspend caller for at least |timeout_us| microseconds.
    // If |timeout_us| is 0, this function does nothing.
    // If |group| is NULL or current thread is non-bthread, call usleep(3)
    // instead. This function does not create thread-local TaskGroup.
98
    // Returns: 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
99 100
    static int usleep(TaskGroup** pg, uint64_t timeout_us);

101 102
    // Suspend caller and run another bthread. When the caller will resume
    // is undefined.
103
    static void yield(TaskGroup** pg);
gejun's avatar
gejun committed
104 105 106 107 108 109 110 111 112 113 114 115

    // Suspend caller until bthread `tid' terminates.
    static int join(bthread_t tid, void** return_value);

    // Returns true iff the bthread `tid' still exists. Notice that it is
    // just the result at this very moment which may change soon.
    // Don't use this function unless you have to. Never write code like this:
    //    if (exists(tid)) {
    //        Wait for events of the thread.   // Racy, may block indefinitely.
    //    }
    static bool exists(bthread_t tid);

116 117
    // Put attribute associated with `tid' into `*attr'.
    // Returns 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
118
    static int get_attr(bthread_t tid, bthread_attr_t* attr);
119

120 121 122
    // Get/set TaskMeta.stop of the tid.
    static void set_stopped(bthread_t tid);
    static bool is_stopped(bthread_t tid);
gejun's avatar
gejun committed
123

124
    // The bthread running run_main_task();
gejun's avatar
gejun committed
125 126 127 128 129
    bthread_t main_tid() const { return _main_tid; }
    TaskStatistics main_stat() const;
    // Routine of the main task which should be called from a dedicated pthread.
    void run_main_task();

130
    // Meta/Identifier of current task in this group.
gejun's avatar
gejun committed
131 132
    TaskMeta* current_task() const { return _cur_meta; }
    bthread_t current_tid() const { return _cur_meta->tid; }
133 134
    // Uptime of current task in nanoseconds.
    int64_t current_uptime_ns() const
135
    { return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
gejun's avatar
gejun committed
136

137 138 139 140
    // True iff current task is the one running run_main_task()
    bool is_current_main_task() const { return current_tid() == _main_tid; }
    // True iff current task is in pthread-mode.
    bool is_current_pthread_task() const
gejun's avatar
gejun committed
141
    { return _cur_meta->stack == _main_stack; }
gejun's avatar
gejun committed
142

143 144
    // Active time in nanoseconds spent by this TaskGroup.
    int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
gejun's avatar
gejun committed
145

146
    // Push a bthread into the runqueue
147
    void ready_to_run(bthread_t tid, bool nosignal = false);
148 149
    // Flush tasks pushed to rq but signalled.
    void flush_nosignal_tasks();
gejun's avatar
gejun committed
150

151 152
    // Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);
153
    void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
154 155 156 157 158 159 160 161
    void flush_nosignal_tasks_remote();

    // Automatically decide the caller is remote or local, and call
    // the corresponding function.
    void ready_to_run_general(bthread_t tid, bool nosignal = false);
    void flush_nosignal_tasks_general();

    // The TaskControl that this TaskGroup belongs to.
gejun's avatar
gejun committed
162 163 164 165 166
    TaskControl* control() const { return _control; }

    // Call this instead of delete.
    void destroy_self();

167 168 169
    // Wake up blocking ops in the thread.
    // Returns 0 on success, errno otherwise.
    static int interrupt(bthread_t tid, TaskControl* c);
gejun's avatar
gejun committed
170

171 172
    // Get the meta associate with the task.
    static TaskMeta* address_meta(bthread_t tid);
gejun's avatar
gejun committed
173

174 175 176
    // Push a task into _rq, if _rq is full, retry after some time. This
    // process make go on indefinitely.
    void push_rq(bthread_t tid);
gejun's avatar
gejun committed
177 178

private:
179 180
friend class TaskControl;

gejun's avatar
gejun committed
181 182 183 184 185 186 187 188 189 190 191
    // You shall use TaskControl::create_group to create new instance.
    explicit TaskGroup(TaskControl*);

    int init(size_t runqueue_capacity);

    // You shall call destroy_self() instead of destructor because deletion
    // of groups are postponed to avoid race.
    ~TaskGroup();

    static void task_runner(intptr_t skip_remained);

192
    // Callbacks for set_remained()
gejun's avatar
gejun committed
193
    static void _release_last_context(void*);
194
    static void _add_sleep_event(void*);
195 196 197 198
    struct ReadyToRunArgs {
        bthread_t tid;
        bool nosignal;
    };
gejun's avatar
gejun committed
199
    static void ready_to_run_in_worker(void*);
200 201 202 203 204 205 206 207 208 209 210
    static void ready_to_run_in_worker_ignoresignal(void*);

    // Wait for a task to run.
    // Returns true on success, false is treated as permanent error and the
    // loop calling this function should end.
    bool wait_task(bthread_t* tid);

    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
gejun's avatar
gejun committed
211
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
212 213 214 215
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }
gejun's avatar
gejun committed
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231

#ifndef NDEBUG
    int _sched_recursive_guard;
#endif

    TaskMeta* _cur_meta;
    
    // the control that this group belongs to
    TaskControl* _control;
    int _num_nosignal;
    int _nsignaled;
    // last scheduling time
    int64_t _last_run_ns;
    int64_t _cumulated_cputime_ns;

    size_t _nswitch;
232
    RemainedFn _last_context_remained;
gejun's avatar
gejun committed
233 234
    void* _last_context_remained_arg;

235
    ParkingLot* _pl;
gejun's avatar
gejun committed
236
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
237 238
    ParkingLot::State _last_pl_state;
#endif
gejun's avatar
gejun committed
239 240
    size_t _steal_seed;
    size_t _steal_offset;
gejun's avatar
gejun committed
241
    ContextualStack* _main_stack;
gejun's avatar
gejun committed
242
    bthread_t _main_tid;
gejun's avatar
gejun committed
243
    WorkStealingQueue<bthread_t> _rq;
244 245 246
    RemoteTaskQueue _remote_rq;
    int _remote_num_nosignal;
    int _remote_nsignaled;
gejun's avatar
gejun committed
247 248 249 250 251 252
};

}  // namespace bthread

#include "task_group_inl.h"

gejun's avatar
gejun committed
253
#endif  // BTHREAD_TASK_GROUP_H