task_control.h 4.02 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2012 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012

gejun's avatar
gejun committed
19 20
#ifndef BTHREAD_TASK_CONTROL_H
#define BTHREAD_TASK_CONTROL_H
gejun's avatar
gejun committed
21 22 23 24 25

#ifndef NDEBUG
#include <iostream>                             // std::ostream
#endif
#include <stddef.h>                             // size_t
26
#include "butil/atomicops.h"                     // butil::atomic
gejun's avatar
gejun committed
27 28
#include "bvar/bvar.h"                          // bvar::PassiveStatus
#include "bthread/task_meta.h"                  // TaskMeta
29
#include "butil/resource_pool.h"                 // ResourcePool
gejun's avatar
gejun committed
30
#include "bthread/work_stealing_queue.h"        // WorkStealingQueue
31
#include "bthread/parking_lot.h"
gejun's avatar
gejun committed
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61

namespace bthread {

class TaskGroup;

// Control all task groups
class TaskControl {
    friend class TaskGroup;

public:
    TaskControl();
    ~TaskControl();

    // Must be called before using. `nconcurrency' is # of worker pthreads.
    int init(int nconcurrency);
    
    // Create a TaskGroup in this control.
    TaskGroup* create_group();

    // Steal a task from a "random" group.
    bool steal_task(bthread_t* tid, size_t* seed, size_t offset);

    // Tell other groups that `n' tasks was just added to caller's runqueue
    void signal_task(int num_task);

    // Stop and join worker threads in TaskControl.
    void stop_and_join();
    
    // Get # of worker threads.
    int concurrency() const 
62
    { return _concurrency.load(butil::memory_order_acquire); }
gejun's avatar
gejun committed
63

gejun's avatar
gejun committed
64
    void print_rq_sizes(std::ostream& os);
gejun's avatar
gejun committed
65 66 67 68 69 70 71 72 73 74 75 76 77

    double get_cumulated_worker_time();
    int64_t get_cumulated_switch_count();
    int64_t get_cumulated_signal_count();

    // [Not thread safe] Add more worker threads.
    // Return the number of workers actually added, which may be less then |num|
    int add_workers(int num);

    // Choose one TaskGroup (randomly right now).
    // If this method is called after init(), it never returns NULL.
    TaskGroup* choose_one_group();

gejun's avatar
gejun committed
78
private:
gejun's avatar
gejun committed
79 80 81 82 83 84 85 86 87 88
    // Add/Remove a TaskGroup.
    // Returns 0 on success, -1 otherwise.
    int _add_group(TaskGroup*);
    int _destroy_group(TaskGroup*);

    static void delete_task_group(void* arg);

    static void* worker_thread(void* task_control);

    bvar::LatencyRecorder& exposed_pending_time();
gejun's avatar
gejun committed
89 90
    bvar::LatencyRecorder* create_exposed_pending_time();

91
    butil::atomic<size_t> _ngroup;
gejun's avatar
gejun committed
92
    TaskGroup** _groups;
93
    butil::Mutex _modify_group_mutex;
gejun's avatar
gejun committed
94 95

    bool _stop;
96
    butil::atomic<int> _concurrency;
gejun's avatar
gejun committed
97 98 99
    std::vector<pthread_t> _workers;

    bvar::Adder<int64_t> _nworkers;
100 101
    butil::Mutex _pending_time_mutex;
    butil::atomic<bvar::LatencyRecorder*> _pending_time;
gejun's avatar
gejun committed
102
    bvar::PassiveStatus<double> _cumulated_worker_time;
gejun's avatar
gejun committed
103
    bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;
gejun's avatar
gejun committed
104 105 106 107 108 109 110
    bvar::PassiveStatus<int64_t> _cumulated_switch_count;
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;
    bvar::PassiveStatus<int64_t> _cumulated_signal_count;
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;
    bvar::PassiveStatus<std::string> _status;
    bvar::Adder<int64_t> _nbthreads;

111 112
    static const int PARKING_LOT_NUM = 4;
    ParkingLot _pl[PARKING_LOT_NUM];
gejun's avatar
gejun committed
113 114
};

gejun's avatar
gejun committed
115
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
116
    bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume);
gejun's avatar
gejun committed
117 118
    if (!pt) {
        pt = create_exposed_pending_time();
gejun's avatar
gejun committed
119
    }
gejun's avatar
gejun committed
120
    return *pt;
gejun's avatar
gejun committed
121 122 123 124
}

}  // namespace bthread

gejun's avatar
gejun committed
125
#endif  // BTHREAD_TASK_CONTROL_H