task_group.cpp 31.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

gejun's avatar
gejun committed
18 19 20 21 22
// bthread - A M:N threading library to make applications more concurrent.

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

gejun's avatar
gejun committed
23
#include <sys/types.h>
gejun's avatar
gejun committed
24 25
#include <stddef.h>                         // size_t
#include <gflags/gflags.h>
gejun's avatar
gejun committed
26 27 28
#include "butil/compat.h"                   // OS_MACOSX
#include "butil/macros.h"                   // ARRAY_SIZE
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
29 30 31
#include "butil/fast_rand.h"
#include "butil/unique_ptr.h"
#include "butil/third_party/murmurhash3/murmurhash3.h" // fmix64
32
#include "bthread/errno.h"                  // ESTOP
gejun's avatar
gejun committed
33 34 35 36 37 38 39 40 41 42 43 44 45
#include "bthread/butex.h"                  // butex_*
#include "bthread/sys_futex.h"              // futex_wake_private
#include "bthread/processor.h"              // cpu_relax
#include "bthread/task_control.h"
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"
#include "bthread/errno.h"

namespace bthread {

static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
    BTHREAD_STACKTYPE_UNKNOWN, 0, NULL };

gejun's avatar
gejun committed
46 47
static bool pass_bool(const char*, bool) { return true; }

gejun's avatar
gejun committed
48 49 50
DEFINE_bool(show_bthread_creation_in_vars, false, "When this flags is on, The time "
            "from bthread creation to first run will be recorded and shown "
            "in /vars");
gejun's avatar
gejun committed
51
const bool ALLOW_UNUSED dummy_show_bthread_creation_in_vars =
52
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_bthread_creation_in_vars,
gejun's avatar
gejun committed
53
                                    pass_bool);
gejun's avatar
gejun committed
54

gejun's avatar
gejun committed
55 56 57
DEFINE_bool(show_per_worker_usage_in_vars, false,
            "Show per-worker usage in /vars/bthread_per_worker_usage_<tid>");
const bool ALLOW_UNUSED dummy_show_per_worker_usage_in_vars =
58
    ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_show_per_worker_usage_in_vars,
gejun's avatar
gejun committed
59
                                    pass_bool);
gejun's avatar
gejun committed
60 61

__thread TaskGroup* tls_task_group = NULL;
62 63 64
// Sync with TaskMeta::local_storage when a bthread is created or destroyed.
// During running, the two fields may be inconsistent, use tls_bls as the
// groundtruth.
gejun's avatar
gejun committed
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
__thread LocalStorage tls_bls = BTHREAD_LOCAL_STORAGE_INITIALIZER;

// defined in bthread/key.cpp
extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);

// [Hacky] This is a special TLS set by bthread-rpc privately... to save
// overhead of creation keytable, may be removed later.
BAIDU_THREAD_LOCAL void* tls_unique_user_ptr = NULL;

const TaskStatistics EMPTY_STAT = { 0, 0 };

const size_t OFFSET_TABLE[] = {
#include "bthread/offset_inl.list"
};

int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
    TaskMeta* const m = address_meta(tid);
    if (m != NULL) {
        const uint32_t given_ver = get_version(tid);
        BAIDU_SCOPED_LOCK(m->version_lock);
        if (given_ver == *m->version_butex) {
            *out = m->attr;
            return 0;
        }
    }
    errno = EINVAL;
    return -1;
}

94
void TaskGroup::set_stopped(bthread_t tid) {
gejun's avatar
gejun committed
95 96 97 98 99 100 101 102 103 104
    TaskMeta* const m = address_meta(tid);
    if (m != NULL) {
        const uint32_t given_ver = get_version(tid);
        BAIDU_SCOPED_LOCK(m->version_lock);
        if (given_ver == *m->version_butex) {
            m->stop = true;
        }
    }
}

105 106
bool TaskGroup::is_stopped(bthread_t tid) {
    TaskMeta* const m = address_meta(tid);
gejun's avatar
gejun committed
107 108 109 110
    if (m != NULL) {
        const uint32_t given_ver = get_version(tid);
        BAIDU_SCOPED_LOCK(m->version_lock);
        if (given_ver == *m->version_butex) {
111
            return m->stop;
gejun's avatar
gejun committed
112 113
        }
    }
114 115 116
    // If the tid does not exist or version does not match, it's intuitive
    // to treat the thread as "stopped".
    return true;
gejun's avatar
gejun committed
117 118
}

119
bool TaskGroup::wait_task(bthread_t* tid) {
gejun's avatar
gejun committed
120
    do {
gejun's avatar
gejun committed
121
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
122
        if (_last_pl_state.stopped()) {
123
            return false;
gejun's avatar
gejun committed
124
        }
125 126 127
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
gejun's avatar
gejun committed
128
        }
gejun's avatar
gejun committed
129
#else
130 131
        const ParkingLot::State st = _pl->get_state();
        if (st.stopped()) {
132
            return false;
133 134
        }
        if (steal_task(tid)) {
gejun's avatar
gejun committed
135 136
            return true;
        }
137
        _pl->wait(st);
gejun's avatar
gejun committed
138
#endif
gejun's avatar
gejun committed
139 140 141
    } while (true);
}

gejun's avatar
gejun committed
142 143 144 145 146 147 148 149 150
static double get_cumulated_cputime_from_this(void* arg) {
    return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
}

void TaskGroup::run_main_task() {
    bvar::PassiveStatus<double> cumulated_cputime(
        get_cumulated_cputime_from_this, this);
    std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
    
gejun's avatar
gejun committed
151 152
    TaskGroup* dummy = this;
    bthread_t tid;
153
    while (wait_task(&tid)) {
gejun's avatar
gejun committed
154 155
        TaskGroup::sched_to(&dummy, tid);
        DCHECK_EQ(this, dummy);
gejun's avatar
gejun committed
156
        DCHECK_EQ(_cur_meta->stack, _main_stack);
gejun's avatar
gejun committed
157 158 159
        if (_cur_meta->tid != _main_tid) {
            TaskGroup::task_runner(1/*skip remained*/);
        }
gejun's avatar
gejun committed
160 161
        if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
            char name[32];
gejun's avatar
gejun committed
162 163 164 165
#if defined(OS_MACOSX)
            snprintf(name, sizeof(name), "bthread_worker_usage_%" PRIu64,
                     pthread_numeric_id());
#else
gejun's avatar
gejun committed
166 167
            snprintf(name, sizeof(name), "bthread_worker_usage_%ld",
                     (long)syscall(SYS_gettid));
gejun's avatar
gejun committed
168
#endif
gejun's avatar
gejun committed
169 170 171
            usage_bvar.reset(new bvar::PerSecond<bvar::PassiveStatus<double> >
                             (name, &cumulated_cputime, 1));
        }
gejun's avatar
gejun committed
172 173 174
    }
    // stop_main_task() was called.
    // Don't forget to add elapse of last wait_task.
175
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
gejun's avatar
gejun committed
176 177 178 179 180 181 182 183 184 185 186
}

TaskGroup::TaskGroup(TaskControl* c)
    :
#ifndef NDEBUG
    _sched_recursive_guard(0),
#endif
    _cur_meta(NULL)
    , _control(c)
    , _num_nosignal(0)
    , _nsignaled(0)
187
    , _last_run_ns(butil::cpuwide_time_ns())
gejun's avatar
gejun committed
188 189 190 191
    , _cumulated_cputime_ns(0)
    , _nswitch(0)
    , _last_context_remained(NULL)
    , _last_context_remained_arg(NULL)
192
    , _pl(NULL) 
gejun's avatar
gejun committed
193
    , _main_stack(NULL)
gejun's avatar
gejun committed
194
    , _main_tid(0)
195 196
    , _remote_num_nosignal(0)
    , _remote_nsignaled(0)
gejun's avatar
gejun committed
197
{
198
    _steal_seed = butil::fast_rand();
gejun's avatar
gejun committed
199
    _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
gejun's avatar
gejun committed
200
    _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) % TaskControl::PARKING_LOT_NUM];
gejun's avatar
gejun committed
201 202 203 204 205 206
    CHECK(c);
}

TaskGroup::~TaskGroup() {
    if (_main_tid) {
        TaskMeta* m = address_meta(_main_tid);
gejun's avatar
gejun committed
207
        CHECK(_main_stack == m->stack);
gejun's avatar
gejun committed
208 209 210 211 212 213 214
        return_stack(m->release_stack());
        return_resource(get_slot(_main_tid));
        _main_tid = 0;
    }
}

int TaskGroup::init(size_t runqueue_capacity) {
215 216 217 218 219 220
    if (_rq.init(runqueue_capacity) != 0) {
        LOG(FATAL) << "Fail to init _rq";
        return -1;
    }
    if (_remote_rq.init(runqueue_capacity / 2) != 0) {
        LOG(FATAL) << "Fail to init _remote_rq";
gejun's avatar
gejun committed
221 222
        return -1;
    }
gejun's avatar
gejun committed
223 224
    ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
    if (NULL == stk) {
gejun's avatar
gejun committed
225 226 227
        LOG(FATAL) << "Fail to get main stack container";
        return -1;
    }
228 229
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
gejun's avatar
gejun committed
230 231 232 233 234
    if (NULL == m) {
        LOG(FATAL) << "Fail to get TaskMeta";
        return -1;
    }
    m->stop = false;
235
    m->interrupted = false;
gejun's avatar
gejun committed
236 237 238
    m->about_to_quit = false;
    m->fn = NULL;
    m->arg = NULL;
239
    m->local_storage = LOCAL_STORAGE_INIT;
240
    m->cpuwide_start_ns = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
241 242 243
    m->stat = EMPTY_STAT;
    m->attr = BTHREAD_ATTR_TASKGROUP;
    m->tid = make_tid(*m->version_butex, slot);
gejun's avatar
gejun committed
244
    m->set_stack(stk);
gejun's avatar
gejun committed
245 246 247

    _cur_meta = m;
    _main_tid = m->tid;
gejun's avatar
gejun committed
248
    _main_stack = stk;
249
    _last_run_ns = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
250 251 252 253 254 255 256 257 258 259
    return 0;
}

void TaskGroup::task_runner(intptr_t skip_remained) {
    // NOTE: tls_task_group is volatile since tasks are moved around
    //       different groups.
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
        while (g->_last_context_remained) {
260
            RemainedFn fn = g->_last_context_remained;
gejun's avatar
gejun committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = tls_task_group;
        }

#ifndef NDEBUG
        --g->_sched_recursive_guard;
#endif
    }

    do {
        // A task can be stopped before it gets running, in which case
        // we may skip user function, but that may confuse user:
        // Most tasks have variables to remember running result of the task,
        // which is often initialized to values indicating success. If an
        // user function is never called, the variables will be unchanged
        // however they'd better reflect failures because the task is stopped
        // abnormally.
        
        // Meta and identifier of the task is persistent in this run.
        TaskMeta* const m = g->_cur_meta;

        if (FLAGS_show_bthread_creation_in_vars) {
            // NOTE: the thread triggering exposure of pending time may spend
            // considerable time because a single bvar::LatencyRecorder
            // contains many bvar.
            g->_control->exposed_pending_time() <<
288
                (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
gejun's avatar
gejun committed
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
        }
        
        // Not catch exceptions except ExitException which is for implementing
        // bthread_exit(). User code is intended to crash when an exception is 
        // not caught explicitly. This is consistent with other threading 
        // libraries.
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        } 
        
        // Group is probably changed
        g = tls_task_group;

        // TODO: Save thread_return
        (void)thread_return;

308 309 310 311 312 313 314 315
        // Logging must be done before returning the keytable, since the logging lib 
        // use bthread local storage internally, or will cause memory leak.
        // FIXME: the time from quiting fn to here is not counted into cputime
        if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
            LOG(INFO) << "Finished bthread " << m->tid << ", cputime="
                      << m->stat.cputime_ns / 1000000.0 << "ms";
        }

gejun's avatar
gejun committed
316 317 318
        // Clean tls variables, must be done before changing version_butex
        // otherwise another thread just joined this thread may not see side
        // effects of destructing tls variables.
319
        KeyTable* kt = tls_bls.keytable;
gejun's avatar
gejun committed
320 321 322 323
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // After deletion: tls may be set during deletion.
            tls_bls.keytable = NULL;
324
            m->local_storage.keytable = NULL; // optional
gejun's avatar
gejun committed
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
        }
        
        // Increase the version and wake up all joiners, if resulting version
        // is 0, change it to 1 to make bthread_t never be 0. Any access
        // or join to the bthread after changing version will be rejected.
        // The spinlock is for visibility of TaskGroup::get_attr.
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        butex_wake_except(m->version_butex, 0);

        g->_control->_nbthreads << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        ending_sched(&g);
        
    } while (g->_cur_meta->tid != g->_main_tid);
    
    // Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
    // tasks to run, quit for more tasks.
}

void TaskGroup::_release_last_context(void* arg) {
350
    TaskMeta* m = static_cast<TaskMeta*>(arg);
gejun's avatar
gejun committed
351 352 353
    if (m->stack_type() != STACK_TYPE_PTHREAD) {
        return_stack(m->release_stack()/*may be NULL*/);
    } else {
gejun's avatar
gejun committed
354
        // it's _main_stack, don't return.
gejun's avatar
gejun committed
355 356 357 358 359 360 361 362 363 364 365 366 367
        m->set_stack(NULL);
    }
    return_resource(get_slot(m->tid));
}

int TaskGroup::start_foreground(TaskGroup** pg,
                                bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
    if (__builtin_expect(!fn, 0)) {
        return EINVAL;
    }
368
    const int64_t start_ns = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
369
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
370 371
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
gejun's avatar
gejun committed
372 373 374
    if (__builtin_expect(!m, 0)) {
        return ENOMEM;
    }
375
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
gejun's avatar
gejun committed
376
    m->stop = false;
377
    m->interrupted = false;
gejun's avatar
gejun committed
378 379 380
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
gejun's avatar
gejun committed
381
    CHECK(m->stack == NULL);
gejun's avatar
gejun committed
382 383 384 385 386 387 388 389 390 391 392 393 394 395
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
        LOG(INFO) << "Started bthread " << m->tid;
    }

    TaskGroup* g = *pg;
    g->_control->_nbthreads << 1;
    if (g->is_current_pthread_task()) {
        // never create foreground task in pthread.
396
        g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
gejun's avatar
gejun committed
397
    } else {
398
        // NOSIGNAL affects current task, not the new task.
399 400 401 402 403 404 405 406 407 408 409
        RemainedFn fn = NULL;
        if (g->current_task()->about_to_quit) {
            fn = ready_to_run_in_worker_ignoresignal;
        } else {
            fn = ready_to_run_in_worker;
        }
        ReadyToRunArgs args = {
            g->current_tid(),
            (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
        };
        g->set_remained(fn, &args);
gejun's avatar
gejun committed
410 411 412 413 414
        TaskGroup::sched_to(pg, m->tid);
    }
    return 0;
}

415
template <bool REMOTE>
gejun's avatar
gejun committed
416 417 418 419 420 421 422
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
    if (__builtin_expect(!fn, 0)) {
        return EINVAL;
    }
423
    const int64_t start_ns = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
424
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
425 426
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
gejun's avatar
gejun committed
427 428 429
    if (__builtin_expect(!m, 0)) {
        return ENOMEM;
    }
430
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
gejun's avatar
gejun committed
431
    m->stop = false;
432
    m->interrupted = false;
gejun's avatar
gejun committed
433 434 435
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
gejun's avatar
gejun committed
436
    CHECK(m->stack == NULL);
gejun's avatar
gejun committed
437 438 439 440 441 442 443 444 445 446
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
        LOG(INFO) << "Started bthread " << m->tid;
    }
    _control->_nbthreads << 1;
447 448 449 450
    if (REMOTE) {
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
gejun's avatar
gejun committed
451 452 453 454
    }
    return 0;
}

455 456 457 458 459 460 461 462 463 464 465 466
// Explicit instantiations.
template int
TaskGroup::start_background<true>(bthread_t* __restrict th,
                                  const bthread_attr_t* __restrict attr,
                                  void * (*fn)(void*),
                                  void* __restrict arg);
template int
TaskGroup::start_background<false>(bthread_t* __restrict th,
                                   const bthread_attr_t* __restrict attr,
                                   void * (*fn)(void*),
                                   void* __restrict arg);

gejun's avatar
gejun committed
467 468 469 470 471
int TaskGroup::join(bthread_t tid, void** return_value) {
    if (__builtin_expect(!tid, 0)) {  // tid of bthread is never 0.
        return EINVAL;
    }
    TaskMeta* m = address_meta(tid);
472 473 474 475 476 477 478
    if (__builtin_expect(!m, 0)) {
        // The bthread is not created yet, this join is definitely wrong.
        return EINVAL;
    }
    TaskGroup* g = tls_task_group;
    if (g != NULL && g->current_tid() == tid) {
        // joining self causes indefinite waiting.
gejun's avatar
gejun committed
479 480 481
        return EINVAL;
    }
    const uint32_t expected_version = get_version(tid);
482 483 484 485
    while (*m->version_butex == expected_version) {
        if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR) {
            return errno;
gejun's avatar
gejun committed
486 487 488
        }
    }
    if (return_value) {
489
        *return_value = NULL;
gejun's avatar
gejun committed
490
    }
491
    return 0;
gejun's avatar
gejun committed
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
}

bool TaskGroup::exists(bthread_t tid) {
    if (tid != 0) {  // tid of bthread is never 0.
        TaskMeta* m = address_meta(tid);
        if (m != NULL) {
            return (*m->version_butex == get_version(tid));
        }
    }
    return false;
}

TaskStatistics TaskGroup::main_stat() const {
    TaskMeta* m = address_meta(_main_tid);
    return m ? m->stat : EMPTY_STAT;
}

void TaskGroup::ending_sched(TaskGroup** pg) {
    TaskGroup* g = *pg;
    bthread_t next_tid = 0;
    // Find next task to run, if none, switch to idle thread of the group.
gejun's avatar
gejun committed
513
#ifndef BTHREAD_FAIR_WSQ
514 515 516
    // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
    // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
    // to 2.9%
gejun's avatar
gejun committed
517
    const bool popped = g->_rq.pop(&next_tid);
gejun's avatar
gejun committed
518 519 520
#else
    const bool popped = g->_rq.steal(&next_tid);
#endif
521 522 523
    if (!popped && !g->steal_task(&next_tid)) {
        // Jump to main task if there's no task to run.
        next_tid = g->_main_tid;
gejun's avatar
gejun committed
524 525 526 527
    }

    TaskMeta* const cur_meta = g->_cur_meta;
    TaskMeta* next_meta = address_meta(next_tid);
gejun's avatar
gejun committed
528
    if (next_meta->stack == NULL) {
gejun's avatar
gejun committed
529 530
        if (next_meta->stack_type() == cur_meta->stack_type()) {
            // also works with pthread_task scheduling to pthread_task, the
gejun's avatar
gejun committed
531
            // transfered stack is just _main_stack.
gejun's avatar
gejun committed
532 533
            next_meta->set_stack(cur_meta->release_stack());
        } else {
gejun's avatar
gejun committed
534 535 536
            ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
            if (stk) {
                next_meta->set_stack(stk);
gejun's avatar
gejun committed
537 538 539 540 541 542
            } else {
                // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
                // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
                // This basically means that if we can't allocate stack, run
                // the task in pthread directly.
                next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
gejun's avatar
gejun committed
543
                next_meta->set_stack(g->_main_stack);
gejun's avatar
gejun committed
544 545 546 547 548 549 550 551 552 553
            }
        }
    }
    sched_to(pg, next_meta);
}

void TaskGroup::sched(TaskGroup** pg) {
    TaskGroup* g = *pg;
    bthread_t next_tid = 0;
    // Find next task to run, if none, switch to idle thread of the group.
gejun's avatar
gejun committed
554
#ifndef BTHREAD_FAIR_WSQ
gejun's avatar
gejun committed
555
    const bool popped = g->_rq.pop(&next_tid);
gejun's avatar
gejun committed
556 557 558
#else
    const bool popped = g->_rq.steal(&next_tid);
#endif
559 560 561
    if (!popped && !g->steal_task(&next_tid)) {
        // Jump to main task if there's no task to run.
        next_tid = g->_main_tid;
gejun's avatar
gejun committed
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
    }
    sched_to(pg, next_tid);
}

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
    TaskGroup* g = *pg;
#ifndef NDEBUG
    if ((++g->_sched_recursive_guard) > 1) {
        LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
                   << ") call sched_to(" << g << ")";
    }
#endif
    // Save errno so that errno is bthread-specific.
    const int saved_errno = errno;
    void* saved_unique_user_ptr = tls_unique_user_ptr;

    TaskMeta* const cur_meta = g->_cur_meta;
579
    const int64_t now = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
580 581 582 583 584 585 586 587 588 589
    const int64_t elp_ns = now - g->_last_run_ns;
    g->_last_run_ns = now;
    cur_meta->stat.cputime_ns += elp_ns;
    if (cur_meta->tid != g->main_tid()) {
        g->_cumulated_cputime_ns += elp_ns;
    }
    ++cur_meta->stat.nswitch;
    ++ g->_nswitch;
    // Switch to the task
    if (__builtin_expect(next_meta != cur_meta, 1)) {
590
        g->_cur_meta = next_meta;
591 592
        // Switch tls_bls
        cur_meta->local_storage = tls_bls;
593 594 595 596
        tls_bls = next_meta->local_storage;

        // Logging must be done after switching the local storage, since the logging lib 
        // use bthread local storage internally, or will cause memory leak.
gejun's avatar
gejun committed
597 598 599 600 601
        if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
            (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
            LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
                      << next_meta->tid;
        }
602

gejun's avatar
gejun committed
603 604 605
        if (cur_meta->stack != NULL) {
            if (next_meta->stack != cur_meta->stack) {
                jump_stack(cur_meta->stack, next_meta->stack);
gejun's avatar
gejun committed
606 607 608 609 610 611
                // probably went to another group, need to assign g again.
                g = tls_task_group;
            }
#ifndef NDEBUG
            else {
                // else pthread_task is switching to another pthread_task, sc
gejun's avatar
gejun committed
612 613
                // can only equal when they're both _main_stack
                CHECK(cur_meta->stack == g->_main_stack);
gejun's avatar
gejun committed
614 615 616 617 618 619 620 621 622
            }
#endif
        }
        // else because of ending_sched(including pthread_task->pthread_task)
    } else {
        LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
    }

    while (g->_last_context_remained) {
623
        RemainedFn fn = g->_last_context_remained;
gejun's avatar
gejun committed
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
        g->_last_context_remained = NULL;
        fn(g->_last_context_remained_arg);
        g = tls_task_group;
    }

    // Restore errno
    errno = saved_errno;
    tls_unique_user_ptr = saved_unique_user_ptr;

#ifndef NDEBUG
    --g->_sched_recursive_guard;
#endif
    *pg = g;
}

void TaskGroup::destroy_self() {
    if (_control) {
        _control->_destroy_group(this);
        _control = NULL;
    } else {
        CHECK(false);
    }
}

648
void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
649
    push_rq(tid);
650
    if (nosignal) {
651 652 653 654 655 656
        ++_num_nosignal;
    } else {
        const int additional_signal = _num_nosignal;
        _num_nosignal = 0;
        _nsignaled += 1 + additional_signal;
        _control->signal_task(1 + additional_signal);
657 658 659 660 661 662
    }
}

void TaskGroup::flush_nosignal_tasks() {
    const int val = _num_nosignal;
    if (val) {
gejun's avatar
gejun committed
663 664 665
        _num_nosignal = 0;
        _nsignaled += val;
        _control->signal_task(val);
666 667
    }
}
gejun's avatar
gejun committed
668

669 670 671 672 673 674
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
    _remote_rq._mutex.lock();
    while (!_remote_rq.push_locked(tid)) {
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity();
gejun's avatar
gejun committed
675
        ::usleep(1000);
676 677 678 679 680 681 682 683 684 685 686
        _remote_rq._mutex.lock();
    }
    if (nosignal) {
        ++_remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_nsignaled += 1 + additional_signal;
        _remote_rq._mutex.unlock();
        _control->signal_task(1 + additional_signal);
gejun's avatar
gejun committed
687 688 689
    }
}

690
void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
691 692 693 694 695 696 697 698
    const int val = _remote_num_nosignal;
    if (!val) {
        locked_mutex.unlock();
        return;
    }
    _remote_num_nosignal = 0;
    _remote_nsignaled += val;
    locked_mutex.unlock();
gejun's avatar
gejun committed
699
    _control->signal_task(val);
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
}

void TaskGroup::ready_to_run_general(bthread_t tid, bool nosignal) {
    if (tls_task_group == this) {
        return ready_to_run(tid, nosignal);
    }
    return ready_to_run_remote(tid, nosignal);
}

void TaskGroup::flush_nosignal_tasks_general() {
    if (tls_task_group == this) {
        return flush_nosignal_tasks();
    }
    return flush_nosignal_tasks_remote();
}

716 717 718
void TaskGroup::ready_to_run_in_worker(void* args_in) {
    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
    return tls_task_group->ready_to_run(args->tid, args->nosignal);
719 720
}

721 722 723
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
    return tls_task_group->push_rq(args->tid);
gejun's avatar
gejun committed
724 725 726 727 728 729 730 731 732 733 734 735
}

struct SleepArgs {
    uint64_t timeout_us;
    bthread_t tid;
    TaskMeta* meta;
    TaskGroup* group;
};

static void ready_to_run_from_timer_thread(void* arg) {
    CHECK(tls_task_group == NULL);
    const SleepArgs* e = static_cast<const SleepArgs*>(arg);
736
    e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
gejun's avatar
gejun committed
737 738
}

739
void TaskGroup::_add_sleep_event(void* void_args) {
gejun's avatar
gejun committed
740 741 742
    // Must copy SleepArgs. After calling TimerThread::schedule(), previous
    // thread may be stolen by a worker immediately and the on-stack SleepArgs
    // will be gone.
743
    SleepArgs e = *static_cast<SleepArgs*>(void_args);
gejun's avatar
gejun committed
744 745 746 747
    TaskGroup* g = e.group;
    
    TimerThread::TaskId sleep_id;
    sleep_id = get_global_timer_thread()->schedule(
748
        ready_to_run_from_timer_thread, void_args,
749
        butil::microseconds_from_now(e.timeout_us));
gejun's avatar
gejun committed
750 751

    if (!sleep_id) {
752
        // fail to schedule timer, go back to previous thread.
gejun's avatar
gejun committed
753 754 755 756
        g->ready_to_run(e.tid);
        return;
    }
    
757
    // Set TaskMeta::current_sleep which is for interruption.
gejun's avatar
gejun committed
758 759 760
    const uint32_t given_ver = get_version(e.tid);
    {
        BAIDU_SCOPED_LOCK(e.meta->version_lock);
761
        if (given_ver == *e.meta->version_butex && !e.meta->interrupted) {
gejun's avatar
gejun committed
762 763 764 765
            e.meta->current_sleep = sleep_id;
            return;
        }
    }
766 767 768
    // The thread is stopped or interrupted.
    // interrupt() always sees that current_sleep == 0. It will not schedule
    // the calling thread. The race is between current thread and timer thread.
gejun's avatar
gejun committed
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
    if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
        // added to timer, previous thread may be already woken up by timer and
        // even stopped. It's safe to schedule previous thread when unschedule()
        // returns 0 which means "the not-run-yet sleep_id is removed". If the
        // sleep_id is running(returns 1), ready_to_run_in_worker() will
        // schedule previous thread as well. If sleep_id does not exist,
        // previous thread is scheduled by timer thread before and we don't
        // have to do it again.
        g->ready_to_run(e.tid);
    }
}

// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
    if (0 == timeout_us) {
784 785
        yield(pg);
        return 0;
gejun's avatar
gejun committed
786 787 788 789 790 791 792 793 794
    }
    TaskGroup* g = *pg;
    // We have to schedule timer after we switched to next bthread otherwise
    // the timer may wake up(jump to) current still-running context.
    SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };
    g->set_remained(_add_sleep_event, &e);
    sched(pg);
    g = *pg;
    e.meta->current_sleep = 0;
795 796 797 798 799 800 801 802 803
    if (e.meta->interrupted) {
        // Race with set and may consume multiple interruptions, which are OK.
        e.meta->interrupted = false;
        // NOTE: setting errno to ESTOP is not necessary from bthread's
        // pespective, however many RPC code expects bthread_usleep to set
        // errno to ESTOP when the thread is stopping, and print FATAL
        // otherwise. To make smooth transitions, ESTOP is still set instead
        // of EINTR when the thread is stopping.
        errno = (e.meta->stop ? ESTOP : EINTR);
gejun's avatar
gejun committed
804 805 806 807 808
        return -1;
    }
    return 0;
}

809 810 811 812 813 814
// Defined in butex.cpp
bool erase_from_butex_because_of_interruption(ButexWaiter* bw);

static int interrupt_and_consume_waiters(
    bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) {
    TaskMeta* const m = TaskGroup::address_meta(tid);
gejun's avatar
gejun committed
815 816 817 818
    if (m == NULL) {
        return EINVAL;
    }
    const uint32_t given_ver = get_version(tid);
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
    BAIDU_SCOPED_LOCK(m->version_lock);
    if (given_ver == *m->version_butex) {
        *pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire);
        *sleep_id = m->current_sleep;
        m->current_sleep = 0;  // only one stopper gets the sleep_id
        m->interrupted = true;
        return 0;
    }
    return EINVAL;
}

static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
    TaskMeta* const m = TaskGroup::address_meta(tid);
    if (m != NULL) {
        const uint32_t given_ver = get_version(tid);
gejun's avatar
gejun committed
834 835
        BAIDU_SCOPED_LOCK(m->version_lock);
        if (given_ver == *m->version_butex) {
836 837 838
            // Release fence makes m->interrupted visible to butex_wait
            m->current_waiter.store(w, butil::memory_order_release);
            return 0;
gejun's avatar
gejun committed
839 840
        }
    }
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
    return EINVAL;
}

// The interruption is "persistent" compared to the ones caused by signals,
// namely if a bthread is interrupted when it's not blocked, the interruption
// is still remembered and will be checked at next blocking. This designing
// choice simplifies the implementation and reduces notification loss caused
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
    // Consume current_waiter in the TaskMeta, wake it up then set it back.
    ButexWaiter* w = NULL;
    uint64_t sleep_id = 0;
    int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);
    if (rc) {
        return rc;
    }
    // a bthread cannot wait on a butex and be sleepy at the same time.
860
    CHECK(!sleep_id || !w);
861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
    if (w != NULL) {
        erase_from_butex_because_of_interruption(w);
        // If butex_wait() already wakes up before we set current_waiter back,
        // the function will spin until current_waiter becomes non-NULL.
        rc = set_butex_waiter(tid, w);
        if (rc) {
            LOG(FATAL) << "butex_wait should spin until setting back waiter";
            return rc;
        }
    } else if (sleep_id != 0) {
        if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
            bthread::TaskGroup* g = bthread::tls_task_group;
            if (g) {
                g->ready_to_run(tid);
            } else {
                if (!c) {
                    return EINVAL;
                }
                c->choose_one_group()->ready_to_run_remote(tid);
            }
        }
gejun's avatar
gejun committed
882 883 884 885
    }
    return 0;
}

886
void TaskGroup::yield(TaskGroup** pg) {
gejun's avatar
gejun committed
887
    TaskGroup* g = *pg;
888 889
    ReadyToRunArgs args = { g->current_tid(), false };
    g->set_remained(ready_to_run_in_worker, &args);
gejun's avatar
gejun committed
890 891 892 893 894 895 896 897 898 899 900 901
    sched(pg);
}

void print_task(std::ostream& os, bthread_t tid) {
    TaskMeta* const m = TaskGroup::address_meta(tid);
    if (m == NULL) {
        os << "bthread=" << tid << " : never existed";
        return;
    }
    const uint32_t given_ver = get_version(tid);
    bool matched = false;
    bool stop = false;
902
    bool interrupted = false;
gejun's avatar
gejun committed
903 904 905 906 907 908 909 910 911 912 913 914
    bool about_to_quit = false;
    void* (*fn)(void*) = NULL;
    void* arg = NULL;
    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
    bool has_tls = false;
    int64_t cpuwide_start_ns = 0;
    TaskStatistics stat = {0, 0};
    {
        BAIDU_SCOPED_LOCK(m->version_lock);
        if (given_ver == *m->version_butex) {
            matched = true;
            stop = m->stop;
915
            interrupted = m->interrupted;
gejun's avatar
gejun committed
916 917 918 919 920 921 922 923 924 925 926 927 928
            about_to_quit = m->about_to_quit;
            fn = m->fn;
            arg = m->arg;
            attr = m->attr;
            has_tls = m->local_storage.keytable;
            cpuwide_start_ns = m->cpuwide_start_ns;
            stat = m->stat;
        }
    }
    if (!matched) {
        os << "bthread=" << tid << " : not exist now";
    } else {
        os << "bthread=" << tid << " :\nstop=" << stop
929
           << "\ninterrupted=" << interrupted
gejun's avatar
gejun committed
930 931 932 933 934 935 936
           << "\nabout_to_quit=" << about_to_quit
           << "\nfn=" << (void*)fn
           << "\narg=" << (void*)arg
           << "\nattr={stack_type=" << attr.stack_type
           << " flags=" << attr.flags
           << " keytable_pool=" << attr.keytable_pool 
           << "}\nhas_tls=" << has_tls
937
           << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
gejun's avatar
gejun committed
938 939 940 941 942 943
           << "\ncputime_ns=" << stat.cputime_ns
           << "\nnswitch=" << stat.nswitch;
    }
}

}  // namespace bthread