task_group.h 9.37 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

gejun's avatar
gejun committed
18 19 20 21 22
// bthread - A M:N threading library to make applications more concurrent.

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

gejun's avatar
gejun committed
23 24
#ifndef BTHREAD_TASK_GROUP_H
#define BTHREAD_TASK_GROUP_H
gejun's avatar
gejun committed
25

26
#include "butil/time.h"                             // cpuwide_time_ns
27
#include "bthread/task_control.h"
gejun's avatar
gejun committed
28 29
#include "bthread/task_meta.h"                     // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h"           // WorkStealingQueue
30
#include "bthread/remote_task_queue.h"             // RemoteTaskQueue
31
#include "butil/resource_pool.h"                    // ResourceId
32
#include "bthread/parking_lot.h"
gejun's avatar
gejun committed
33 34 35

namespace bthread {

36
// For exiting a bthread.
gejun's avatar
gejun committed
37 38 39 40
class ExitException : public std::exception {
public:
    explicit ExitException(void* value) : _value(value) {}
    ~ExitException() throw() {}
gejun's avatar
gejun committed
41
    const char* what() const throw() override {
gejun's avatar
gejun committed
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
        return "ExitException";
    }
    void* value() const {
        return _value;
    }
private:
    void* _value;
};

// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
public:
    // Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
    // the identifier into `tid'. Switch to the new task and schedule old task
    // to run.
    // Return 0 on success, errno otherwise.
    static int start_foreground(TaskGroup** pg,
                                bthread_t* __restrict tid,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg);

    // Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
    // identifier into `tid'. Schedule the new thread to run.
69 70
    //   Called from worker: start_background<false>
    //   Called from non-worker: start_background<true>
gejun's avatar
gejun committed
71
    // Return 0 on success, errno otherwise.
72
    template <bool REMOTE>
gejun's avatar
gejun committed
73 74 75 76 77 78 79 80 81 82
    int start_background(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg);

    // Suspend caller and run next bthread in TaskGroup *pg.
    static void sched(TaskGroup** pg);
    static void ending_sched(TaskGroup** pg);

    // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
83 84
    // Purpose of this function is to avoid pushing `next_tid' to _rq and
    // then being popped by sched(pg), which is not necessary.
gejun's avatar
gejun committed
85
    static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
86 87 88 89 90 91
    static void sched_to(TaskGroup** pg, bthread_t next_tid);
    static void exchange(TaskGroup** pg, bthread_t next_tid);

    // The callback will be run in the beginning of next-run bthread.
    // Can't be called by current bthread directly because it often needs
    // the target to be suspended already.
92 93
    typedef void (*RemainedFn)(void*);
    void set_remained(RemainedFn cb, void* arg) {
94 95 96
        _last_context_remained = cb;
        _last_context_remained_arg = arg;
    }
gejun's avatar
gejun committed
97 98 99 100 101
    
    // Suspend caller for at least |timeout_us| microseconds.
    // If |timeout_us| is 0, this function does nothing.
    // If |group| is NULL or current thread is non-bthread, call usleep(3)
    // instead. This function does not create thread-local TaskGroup.
102
    // Returns: 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
103 104
    static int usleep(TaskGroup** pg, uint64_t timeout_us);

105 106
    // Suspend caller and run another bthread. When the caller will resume
    // is undefined.
107
    static void yield(TaskGroup** pg);
gejun's avatar
gejun committed
108 109 110 111 112 113 114 115 116 117 118 119

    // Suspend caller until bthread `tid' terminates.
    static int join(bthread_t tid, void** return_value);

    // Returns true iff the bthread `tid' still exists. Notice that it is
    // just the result at this very moment which may change soon.
    // Don't use this function unless you have to. Never write code like this:
    //    if (exists(tid)) {
    //        Wait for events of the thread.   // Racy, may block indefinitely.
    //    }
    static bool exists(bthread_t tid);

120 121
    // Put attribute associated with `tid' into `*attr'.
    // Returns 0 on success, -1 otherwise and errno is set.
gejun's avatar
gejun committed
122
    static int get_attr(bthread_t tid, bthread_attr_t* attr);
123

124 125 126
    // Get/set TaskMeta.stop of the tid.
    static void set_stopped(bthread_t tid);
    static bool is_stopped(bthread_t tid);
gejun's avatar
gejun committed
127

128
    // The bthread running run_main_task();
gejun's avatar
gejun committed
129 130 131 132 133
    bthread_t main_tid() const { return _main_tid; }
    TaskStatistics main_stat() const;
    // Routine of the main task which should be called from a dedicated pthread.
    void run_main_task();

zyearn's avatar
zyearn committed
134 135 136 137
    // current_task is a function in macOS 10.0+
#ifdef current_task
#undef current_task
#endif
138
    // Meta/Identifier of current task in this group.
gejun's avatar
gejun committed
139 140
    TaskMeta* current_task() const { return _cur_meta; }
    bthread_t current_tid() const { return _cur_meta->tid; }
141 142
    // Uptime of current task in nanoseconds.
    int64_t current_uptime_ns() const
143
    { return butil::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
gejun's avatar
gejun committed
144

145 146 147 148
    // True iff current task is the one running run_main_task()
    bool is_current_main_task() const { return current_tid() == _main_tid; }
    // True iff current task is in pthread-mode.
    bool is_current_pthread_task() const
gejun's avatar
gejun committed
149
    { return _cur_meta->stack == _main_stack; }
gejun's avatar
gejun committed
150

151 152
    // Active time in nanoseconds spent by this TaskGroup.
    int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
gejun's avatar
gejun committed
153

154
    // Push a bthread into the runqueue
155
    void ready_to_run(bthread_t tid, bool nosignal = false);
156 157
    // Flush tasks pushed to rq but signalled.
    void flush_nosignal_tasks();
gejun's avatar
gejun committed
158

159 160
    // Push a bthread into the runqueue from another non-worker thread.
    void ready_to_run_remote(bthread_t tid, bool nosignal = false);
161
    void flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex);
162 163 164 165 166 167 168 169
    void flush_nosignal_tasks_remote();

    // Automatically decide the caller is remote or local, and call
    // the corresponding function.
    void ready_to_run_general(bthread_t tid, bool nosignal = false);
    void flush_nosignal_tasks_general();

    // The TaskControl that this TaskGroup belongs to.
gejun's avatar
gejun committed
170 171 172 173 174
    TaskControl* control() const { return _control; }

    // Call this instead of delete.
    void destroy_self();

175 176 177
    // Wake up blocking ops in the thread.
    // Returns 0 on success, errno otherwise.
    static int interrupt(bthread_t tid, TaskControl* c);
gejun's avatar
gejun committed
178

179 180
    // Get the meta associate with the task.
    static TaskMeta* address_meta(bthread_t tid);
gejun's avatar
gejun committed
181

182 183 184
    // Push a task into _rq, if _rq is full, retry after some time. This
    // process make go on indefinitely.
    void push_rq(bthread_t tid);
gejun's avatar
gejun committed
185 186

private:
187 188
friend class TaskControl;

gejun's avatar
gejun committed
189 190 191 192 193 194 195 196 197 198 199
    // You shall use TaskControl::create_group to create new instance.
    explicit TaskGroup(TaskControl*);

    int init(size_t runqueue_capacity);

    // You shall call destroy_self() instead of destructor because deletion
    // of groups are postponed to avoid race.
    ~TaskGroup();

    static void task_runner(intptr_t skip_remained);

200
    // Callbacks for set_remained()
gejun's avatar
gejun committed
201
    static void _release_last_context(void*);
202
    static void _add_sleep_event(void*);
203 204 205 206
    struct ReadyToRunArgs {
        bthread_t tid;
        bool nosignal;
    };
gejun's avatar
gejun committed
207
    static void ready_to_run_in_worker(void*);
208 209 210 211 212 213 214 215 216 217 218
    static void ready_to_run_in_worker_ignoresignal(void*);

    // Wait for a task to run.
    // Returns true on success, false is treated as permanent error and the
    // loop calling this function should end.
    bool wait_task(bthread_t* tid);

    bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
gejun's avatar
gejun committed
219
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
220 221 222 223
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }
gejun's avatar
gejun committed
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239

#ifndef NDEBUG
    int _sched_recursive_guard;
#endif

    TaskMeta* _cur_meta;
    
    // the control that this group belongs to
    TaskControl* _control;
    int _num_nosignal;
    int _nsignaled;
    // last scheduling time
    int64_t _last_run_ns;
    int64_t _cumulated_cputime_ns;

    size_t _nswitch;
240
    RemainedFn _last_context_remained;
gejun's avatar
gejun committed
241 242
    void* _last_context_remained_arg;

243
    ParkingLot* _pl;
gejun's avatar
gejun committed
244
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
245 246
    ParkingLot::State _last_pl_state;
#endif
gejun's avatar
gejun committed
247 248
    size_t _steal_seed;
    size_t _steal_offset;
gejun's avatar
gejun committed
249
    ContextualStack* _main_stack;
gejun's avatar
gejun committed
250
    bthread_t _main_tid;
gejun's avatar
gejun committed
251
    WorkStealingQueue<bthread_t> _rq;
252 253 254
    RemoteTaskQueue _remote_rq;
    int _remote_num_nosignal;
    int _remote_nsignaled;
gejun's avatar
gejun committed
255 256 257 258 259 260
};

}  // namespace bthread

#include "task_group_inl.h"

gejun's avatar
gejun committed
261
#endif  // BTHREAD_TASK_GROUP_H