// bthread - A M:N threading library to make applications more concurrent. // Copyright (c) 2012 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Author: Ge,Jun (gejun@baidu.com) // Date: Tue Jul 10 17:40:58 CST 2012 #ifndef BTHREAD_TASK_CONTROL_H #define BTHREAD_TASK_CONTROL_H #ifndef NDEBUG #include <iostream> // std::ostream #endif #include <stddef.h> // size_t #include "butil/atomicops.h" // butil::atomic #include "bvar/bvar.h" // bvar::PassiveStatus #include "bthread/task_meta.h" // TaskMeta #include "butil/resource_pool.h" // ResourcePool #include "bthread/work_stealing_queue.h" // WorkStealingQueue #include "bthread/parking_lot.h" namespace bthread { class TaskGroup; // Control all task groups class TaskControl { friend class TaskGroup; public: TaskControl(); ~TaskControl(); // Must be called before using. `nconcurrency' is # of worker pthreads. int init(int nconcurrency); // Create a TaskGroup in this control. TaskGroup* create_group(); // Steal a task from a "random" group. bool steal_task(bthread_t* tid, size_t* seed, size_t offset); // Tell other groups that `n' tasks was just added to caller's runqueue void signal_task(int num_task); // Stop and join worker threads in TaskControl. void stop_and_join(); // Get # of worker threads. int concurrency() const { return _concurrency.load(butil::memory_order_acquire); } void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); // [Not thread safe] Add more worker threads. // Return the number of workers actually added, which may be less then |num| int add_workers(int num); // Choose one TaskGroup (randomly right now). // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(); private: // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. int _add_group(TaskGroup*); int _destroy_group(TaskGroup*); static void delete_task_group(void* arg); static void* worker_thread(void* task_control); bvar::LatencyRecorder& exposed_pending_time(); bvar::LatencyRecorder* create_exposed_pending_time(); butil::atomic<size_t> _ngroup; TaskGroup** _groups; butil::Mutex _modify_group_mutex; bool _stop; butil::atomic<int> _concurrency; std::vector<pthread_t> _workers; bvar::Adder<int64_t> _nworkers; butil::Mutex _pending_time_mutex; butil::atomic<bvar::LatencyRecorder*> _pending_time; bvar::PassiveStatus<double> _cumulated_worker_time; bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second; bvar::PassiveStatus<int64_t> _cumulated_switch_count; bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second; bvar::PassiveStatus<int64_t> _cumulated_signal_count; bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second; bvar::PassiveStatus<std::string> _status; bvar::Adder<int64_t> _nbthreads; static const int PARKING_LOT_NUM = 4; ParkingLot _pl[PARKING_LOT_NUM]; }; inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume); if (!pt) { pt = create_exposed_pending_time(); } return *pt; } } // namespace bthread #endif // BTHREAD_TASK_CONTROL_H