task_control.h 4.03 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2012 baidu-rpc authors.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

#ifndef BAIDU_BTHREAD_TASK_CONTROL_H
#define BAIDU_BTHREAD_TASK_CONTROL_H

#ifndef NDEBUG
#include <iostream>                             // std::ostream
#endif
#include <stddef.h>                             // size_t
#include "base/atomicops.h"                     // base::atomic
#include "bvar/bvar.h"                          // bvar::PassiveStatus
#include "bthread/task_meta.h"                  // TaskMeta
#include "base/resource_pool.h"                 // ResourcePool
#include "bthread/work_stealing_queue.h"        // WorkStealingQueue
31
#include "bthread/parking_lot.h"
gejun's avatar
gejun committed
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63

namespace bthread {

class TaskGroup;

// Control all task groups
class TaskControl {
    friend class TaskGroup;

public:
    TaskControl();
    ~TaskControl();

    // Must be called before using. `nconcurrency' is # of worker pthreads.
    int init(int nconcurrency);
    
    // Create a TaskGroup in this control.
    TaskGroup* create_group();

    // Steal a task from a "random" group.
    bool steal_task(bthread_t* tid, size_t* seed, size_t offset);

    // Tell other groups that `n' tasks was just added to caller's runqueue
    void signal_task(int num_task);

    // Stop and join worker threads in TaskControl.
    void stop_and_join();
    
    // Get # of worker threads.
    int concurrency() const 
    { return _concurrency.load(base::memory_order_acquire); }

gejun's avatar
gejun committed
64
    void print_rq_sizes(std::ostream& os);
gejun's avatar
gejun committed
65 66 67 68 69 70 71 72 73 74 75 76 77

    double get_cumulated_worker_time();
    int64_t get_cumulated_switch_count();
    int64_t get_cumulated_signal_count();

    // [Not thread safe] Add more worker threads.
    // Return the number of workers actually added, which may be less then |num|
    int add_workers(int num);

    // Choose one TaskGroup (randomly right now).
    // If this method is called after init(), it never returns NULL.
    TaskGroup* choose_one_group();

gejun's avatar
gejun committed
78
private:
gejun's avatar
gejun committed
79 80 81 82 83 84 85 86 87 88
    // Add/Remove a TaskGroup.
    // Returns 0 on success, -1 otherwise.
    int _add_group(TaskGroup*);
    int _destroy_group(TaskGroup*);

    static void delete_task_group(void* arg);

    static void* worker_thread(void* task_control);

    bvar::LatencyRecorder& exposed_pending_time();
gejun's avatar
gejun committed
89 90
    bvar::LatencyRecorder* create_exposed_pending_time();

gejun's avatar
gejun committed
91 92 93 94 95 96 97 98 99 100
    base::atomic<size_t> _ngroup;
    TaskGroup** _groups;
    base::Mutex _modify_group_mutex;

    bool _stop;
    base::atomic<int> _concurrency;
    std::vector<pthread_t> _workers;

    bvar::Adder<int64_t> _nworkers;
    base::Mutex _pending_time_mutex;
gejun's avatar
gejun committed
101
    base::atomic<bvar::LatencyRecorder*> _pending_time;
gejun's avatar
gejun committed
102
    bvar::PassiveStatus<double> _cumulated_worker_time;
gejun's avatar
gejun committed
103
    bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;
gejun's avatar
gejun committed
104 105 106 107 108 109 110
    bvar::PassiveStatus<int64_t> _cumulated_switch_count;
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;
    bvar::PassiveStatus<int64_t> _cumulated_signal_count;
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;
    bvar::PassiveStatus<std::string> _status;
    bvar::Adder<int64_t> _nbthreads;

111 112
    static const int PARKING_LOT_NUM = 4;
    ParkingLot _pl[PARKING_LOT_NUM];
gejun's avatar
gejun committed
113 114
};

gejun's avatar
gejun committed
115 116 117 118
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
    bvar::LatencyRecorder* pt = _pending_time.load(base::memory_order_consume);
    if (!pt) {
        pt = create_exposed_pending_time();
gejun's avatar
gejun committed
119
    }
gejun's avatar
gejun committed
120
    return *pt;
gejun's avatar
gejun committed
121 122 123 124 125
}

}  // namespace bthread

#endif  // BAIDU_BTHREAD_TASK_CONTROL_H