task_group.h 9.15 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2012 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

gejun's avatar
gejun committed
19 20
#ifndef BTHREAD_TASK_GROUP_H
#define BTHREAD_TASK_GROUP_H
gejun's avatar
gejun committed
21

22
#include "butil/time.h"                             // cpuwide_time_ns
23
#include "bthread/task_control.h"
gejun's avatar
gejun committed
24 25
#include "bthread/task_meta.h"                     // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h"           // WorkStealingQueue
26
#include "bthread/remote_task_queue.h"             // RemoteTaskQueue
27
#include "butil/resource_pool.h"                    // ResourceId
28
#include "bthread/parking_lot.h"
gejun's avatar
gejun committed
29 30 31

namespace bthread {

32
// For exiting a bthread.
gejun's avatar
gejun committed
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
class ExitException : public std::exception {
public:
    explicit ExitException(void* value) : _value(value) {}
    ~ExitException() throw() {}
    const char* what() const throw() {
        return "ExitException";
    }
    void* value() const {
        return _value;
    }
private:
    void* _value;
};

// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
public:
    // Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
    // the identifier into `tid'. Switch to the new task and schedule old task
    // to run.
    // Return 0 on success, errno otherwise.
    static int start_foreground(TaskGroup** pg,
                                bthread_t* __restrict tid,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg);

    // Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
    // identifier into `tid'. Schedule the new thread to run.
65 66
    //   Called from worker: start_background<false>
    //   Called from non-worker: start_background<true>
gejun's avatar
gejun committed
67
    // Return 0 on success, errno otherwise.
68
    template <bool REMOTE>
gejun's avatar
gejun committed
69 70 71 72 73 74 75 76 77 78
    int start_background(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg);

    // Suspend caller and run next bthread in TaskGroup *pg.
    static void sched(TaskGroup** pg);
    static void ending_sched(TaskGroup** pg);

    // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
79 80
    // Purpose of this function is to avoid pushing `next_tid' to _rq and
    // then being popped by sched(pg), which is not necessary.
gejun's avatar
gejun committed
81
    static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
82 83 84 85 86 87
    static void sched_to(TaskGroup** pg, bthread_t next_tid);
    static void exchange(TaskGroup** pg, bthread_t next_tid);

    // The callback will be run in the beginning of next-run bthread.
    // Can't be called by current bthread directly because it often needs
    // the target to be suspended already.
88 89
    typedef void (*RemainedFn)(void*);
    void set_remained(RemainedFn cb, void* arg) {
90 91 92
        _last_context_remained = cb;
        _last_context_remained_arg = arg;
    }
gejun's avatar
gejun committed
93 94 95 96 97
    
    // Suspend caller for at least |timeout_us| microseconds.
    // If |timeout_us| is 0, this function does nothing.
    // If |group| is NULL or current thread is non-bthread, call usleep(3)
    // instead. This function does not create thread-local TaskGroup.
98
    // Returns: 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
99 100
    static int usleep(TaskGroup** pg, uint64_t timeout_us);

101 102
    // Suspend caller and run another bthread. When the caller will resume
    // is undefined.
103
    static void yield(TaskGroup** pg);
gejun's avatar
gejun committed
104 105 106 107 108 109 110 111 112 113 114 115

    // Suspend caller until bthread `tid' terminates.
    static int join(bthread_t tid, void** return_value);

    // Returns true iff the bthread `tid' still exists. Notice that it is
    // just the result at this very moment which may change soon.
    // Don't use this function unless you have to. Never write code like this:
    //    if (exists(tid)) {
    //        Wait for events of the thread.   // Racy, may block indefinitely.
    //    }
    static bool exists(bthread_t tid);

116 117
    // Put attribute associated with `tid' into `*attr'.
    // Returns 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
118
    static int get_attr(bthread_t tid, bthread_attr_t* attr);
119

120 121 122
    // Get/set TaskMeta.stop of the tid.
    static void set_stopped(bthread_t tid);
    static bool is_stopped(bthread_t tid);
gejun's avatar
gejun committed
123

124
    // The bthread running run_main_task();
gejun's avatar
gejun committed
125 126 127 128 129
    bthread_t main_tid() const { return _main_tid; }
    TaskStatistics main_stat() const;
    // Routine of the main task which should be called from a dedicated pthread.
    void run_main_task();

zyearn's avatar
zyearn committed
130 131 132 133
    // current_task is a function in macOS 10.0+
#ifdef current_task
#undef current_task
#endif
134
    // Meta/Identifier of current task in this group.
gejun's avatar
gejun committed
135 136
    TaskMeta* current_task() const { return _cur_meta; }
    bthread_t current_tid() const { return _cur_meta->tid; }
137 138
    // Uptime of current task in nanoseconds.
    int64_t current_uptime_ns() const
139
    { return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
gejun's avatar
gejun committed
140

141 142 143 144
    // True iff current task is the one running run_main_task()
    bool is_current_main_task() const { return current_tid() == _main_tid; }
    // True iff current task is in pthread-mode.
    bool is_current_pthread_task() const
gejun's avatar
gejun committed
145
    { return _cur_meta->stack == _main_stack; }
gejun's avatar
gejun committed
146

147 148
    // Active time in nanoseconds spent by this TaskGroup.
    int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
gejun's avatar
gejun committed
149

150
    // Push a bthread into the runqueue
151
    void ready_to_run(bthread_t tid, bool nosignal = false);
152 153
    // Flush tasks pushed to rq but signalled.
    void flush_nosignal_tasks();
gejun's avatar
gejun committed
154

155 156
    // Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);
157
    void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
158 159 160 161 162 163 164 165
    void flush_nosignal_tasks_remote();

    // Automatically decide the caller is remote or local, and call
    // the corresponding function.
    void ready_to_run_general(bthread_t tid, bool nosignal = false);
    void flush_nosignal_tasks_general();

    // The TaskControl that this TaskGroup belongs to.
gejun's avatar
gejun committed
166 167 168 169 170
    TaskControl* control() const { return _control; }

    // Call this instead of delete.
    void destroy_self();

171 172 173
    // Wake up blocking ops in the thread.
    // Returns 0 on success, errno otherwise.
    static int interrupt(bthread_t tid, TaskControl* c);
gejun's avatar
gejun committed
174

175 176
    // Get the meta associate with the task.
    static TaskMeta* address_meta(bthread_t tid);
gejun's avatar
gejun committed
177

178 179 180
    // Push a task into _rq, if _rq is full, retry after some time. This
    // process make go on indefinitely.
    void push_rq(bthread_t tid);
gejun's avatar
gejun committed
181 182

private:
183 184
friend class TaskControl;

gejun's avatar
gejun committed
185 186 187 188 189 190 191 192 193 194 195
    // You shall use TaskControl::create_group to create new instance.
    explicit TaskGroup(TaskControl*);

    int init(size_t runqueue_capacity);

    // You shall call destroy_self() instead of destructor because deletion
    // of groups are postponed to avoid race.
    ~TaskGroup();

    static void task_runner(intptr_t skip_remained);

196
    // Callbacks for set_remained()
gejun's avatar
gejun committed
197
    static void _release_last_context(void*);
198
    static void _add_sleep_event(void*);
199 200 201 202
    struct ReadyToRunArgs {
        bthread_t tid;
        bool nosignal;
    };
gejun's avatar
gejun committed
203
    static void ready_to_run_in_worker(void*);
204 205 206 207 208 209 210 211 212 213 214
    static void ready_to_run_in_worker_ignoresignal(void*);

    // Wait for a task to run.
    // Returns true on success, false is treated as permanent error and the
    // loop calling this function should end.
    bool wait_task(bthread_t* tid);

    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
gejun's avatar
gejun committed
215
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
216 217 218 219
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }
gejun's avatar
gejun committed
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

#ifndef NDEBUG
    int _sched_recursive_guard;
#endif

    TaskMeta* _cur_meta;
    
    // the control that this group belongs to
    TaskControl* _control;
    int _num_nosignal;
    int _nsignaled;
    // last scheduling time
    int64_t _last_run_ns;
    int64_t _cumulated_cputime_ns;

    size_t _nswitch;
236
    RemainedFn _last_context_remained;
gejun's avatar
gejun committed
237 238
    void* _last_context_remained_arg;

239
    ParkingLot* _pl;
gejun's avatar
gejun committed
240
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
241 242
    ParkingLot::State _last_pl_state;
#endif
gejun's avatar
gejun committed
243 244
    size_t _steal_seed;
    size_t _steal_offset;
gejun's avatar
gejun committed
245
    ContextualStack* _main_stack;
gejun's avatar
gejun committed
246
    bthread_t _main_tid;
gejun's avatar
gejun committed
247
    WorkStealingQueue<bthread_t> _rq;
248 249 250
    RemoteTaskQueue _remote_rq;
    int _remote_num_nosignal;
    int _remote_nsignaled;
gejun's avatar
gejun committed
251 252 253 254 255 256
};

}  // namespace bthread

#include "task_group_inl.h"

gejun's avatar
gejun committed
257
#endif  // BTHREAD_TASK_GROUP_H