1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2012 Baidu.com, Inc. All Rights Reserved
// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012
#ifndef BAIDU_BTHREAD_TASK_CONTROL_H
#define BAIDU_BTHREAD_TASK_CONTROL_H
#ifndef NDEBUG
#include <iostream> // std::ostream
#endif
#include <stddef.h> // size_t
#include "base/atomicops.h" // base::atomic
#include "bvar/bvar.h" // bvar::PassiveStatus
#include "bthread/task_meta.h" // TaskMeta
#include "base/resource_pool.h" // ResourcePool
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/parking_lot.h"
namespace bthread {
class TaskGroup;
// Control all task groups
class TaskControl {
friend class TaskGroup;
public:
TaskControl();
~TaskControl();
// Must be called before using. `nconcurrency' is # of worker pthreads.
int init(int nconcurrency);
// Create a TaskGroup in this control.
TaskGroup* create_group();
// Steal a task from a "random" group.
bool steal_task(bthread_t* tid, size_t* seed, size_t offset);
// Tell other groups that `n' tasks was just added to caller's runqueue
void signal_task(int num_task);
// Stop and join worker threads in TaskControl.
void stop_and_join();
// Get # of worker threads.
int concurrency() const
{ return _concurrency.load(base::memory_order_acquire); }
void print_rq_sizes(std::ostream& os);
double get_cumulated_worker_time();
int64_t get_cumulated_switch_count();
int64_t get_cumulated_signal_count();
// [Not thread safe] Add more worker threads.
// Return the number of workers actually added, which may be less then |num|
int add_workers(int num);
// Choose one TaskGroup (randomly right now).
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group();
private:
// Add/Remove a TaskGroup.
// Returns 0 on success, -1 otherwise.
int _add_group(TaskGroup*);
int _destroy_group(TaskGroup*);
static void delete_task_group(void* arg);
static void* worker_thread(void* task_control);
bvar::LatencyRecorder& exposed_pending_time();
bvar::LatencyRecorder* create_exposed_pending_time();
base::atomic<size_t> _ngroup;
TaskGroup** _groups;
base::Mutex _modify_group_mutex;
bool _stop;
base::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
bvar::Adder<int64_t> _nworkers;
base::Mutex _pending_time_mutex;
base::atomic<bvar::LatencyRecorder*> _pending_time;
bvar::PassiveStatus<double> _cumulated_worker_time;
bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;
bvar::PassiveStatus<int64_t> _cumulated_switch_count;
bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;
bvar::PassiveStatus<int64_t> _cumulated_signal_count;
bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;
bvar::PassiveStatus<std::string> _status;
bvar::Adder<int64_t> _nbthreads;
static const int PARKING_LOT_NUM = 4;
ParkingLot _pl[PARKING_LOT_NUM];
};
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
bvar::LatencyRecorder* pt = _pending_time.load(base::memory_order_consume);
if (!pt) {
pt = create_exposed_pending_time();
}
return *pt;
}
} // namespace bthread
#endif // BAIDU_BTHREAD_TASK_CONTROL_H