Commit 0c2e4e1b authored by gejun's avatar gejun

Add min_concurrency flag and support creating workers lazily

parent c9c974c1
...@@ -29,6 +29,12 @@ namespace bthread { ...@@ -29,6 +29,12 @@ namespace bthread {
DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM, DEFINE_int32(bthread_concurrency, 8 + BTHREAD_EPOLL_THREAD_NUM,
"Number of pthread workers"); "Number of pthread workers");
DEFINE_int32(bthread_min_concurrency, 0,
"Initial number of pthread workers which will be added on-demand."
" The laziness is disabled when this value is non-positive,"
" and workers will be created eagerly according to -bthread_concurrency and bthread_setconcurrency(). ");
static bool never_set_bthread_concurrency = true; static bool never_set_bthread_concurrency = true;
static bool validate_bthread_concurrency(const char*, int32_t val) { static bool validate_bthread_concurrency(const char*, int32_t val) {
...@@ -40,6 +46,12 @@ const int ALLOW_UNUSED register_FLAGS_bthread_concurrency = ...@@ -40,6 +46,12 @@ const int ALLOW_UNUSED register_FLAGS_bthread_concurrency =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency, ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency,
validate_bthread_concurrency); validate_bthread_concurrency);
static bool validate_bthread_min_concurrency(const char*, int32_t val);
const int ALLOW_UNUSED register_FLAGS_bthread_min_concurrency =
::google::RegisterFlagValidator(&FLAGS_bthread_min_concurrency,
validate_bthread_min_concurrency);
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match); BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match);
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
...@@ -70,7 +82,10 @@ inline TaskControl* get_or_new_task_control() { ...@@ -70,7 +82,10 @@ inline TaskControl* get_or_new_task_control() {
if (NULL == c) { if (NULL == c) {
return NULL; return NULL;
} }
if (c->init(FLAGS_bthread_concurrency) != 0) { int concurrency = FLAGS_bthread_min_concurrency > 0 ?
FLAGS_bthread_min_concurrency :
FLAGS_bthread_concurrency;
if (c->init(concurrency) != 0) {
LOG(ERROR) << "Fail to init g_task_control"; LOG(ERROR) << "Fail to init g_task_control";
delete c; delete c;
return NULL; return NULL;
...@@ -79,6 +94,27 @@ inline TaskControl* get_or_new_task_control() { ...@@ -79,6 +94,27 @@ inline TaskControl* get_or_new_task_control() {
return c; return c;
} }
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
if (val <= 0) {
return true;
}
if (val < BTHREAD_MIN_CONCURRENCY || val > FLAGS_bthread_concurrency) {
return false;
}
TaskControl* c = get_task_control();
if (!c) {
return true;
}
BAIDU_SCOPED_LOCK(g_task_control_mutex);
int concurrency = c->concurrency();
if (val > concurrency) {
int added = c->add_workers(val - concurrency);
return added == (val - concurrency);
} else {
return true;
}
}
__thread TaskGroup* tls_task_group_nosignal = NULL; __thread TaskGroup* tls_task_group_nosignal = NULL;
BUTIL_FORCE_INLINE int BUTIL_FORCE_INLINE int
...@@ -230,6 +266,16 @@ int bthread_setconcurrency(int num) { ...@@ -230,6 +266,16 @@ int bthread_setconcurrency(int num) {
LOG(ERROR) << "Invalid concurrency=" << num; LOG(ERROR) << "Invalid concurrency=" << num;
return EINVAL; return EINVAL;
} }
if (bthread::FLAGS_bthread_min_concurrency > 0) {
if (num < bthread::FLAGS_bthread_min_concurrency) {
return EINVAL;
}
if (bthread::never_set_bthread_concurrency) {
bthread::never_set_bthread_concurrency = false;
}
bthread::FLAGS_bthread_concurrency = num;
return 0;
}
bthread::TaskControl* c = bthread::get_task_control(); bthread::TaskControl* c = bthread::get_task_control();
if (c != NULL) { if (c != NULL) {
if (num < c->concurrency()) { if (num < c->concurrency()) {
......
...@@ -38,6 +38,10 @@ DEFINE_int32(task_group_yield_before_idle, 0, ...@@ -38,6 +38,10 @@ DEFINE_int32(task_group_yield_before_idle, 0,
namespace bthread { namespace bthread {
DECLARE_int32(bthread_concurrency);
DECLARE_int32(bthread_min_concurrency);
extern pthread_mutex_t g_task_control_mutex;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
void (*g_worker_startfn)() = NULL; void (*g_worker_startfn)() = NULL;
...@@ -375,6 +379,15 @@ void TaskControl::signal_task(int num_task) { ...@@ -375,6 +379,15 @@ void TaskControl::signal_task(int num_task) {
num_task -= _pl[start_index].signal(1); num_task -= _pl[start_index].signal(1);
} }
} }
if (num_task > 0 &&
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1);
}
}
} }
void TaskControl::print_rq_sizes(std::ostream& os) { void TaskControl::print_rq_sizes(std::ostream& os) {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
// Date: Sun Jul 13 15:04:18 CST 2014 // Date: Sun Jul 13 15:04:18 CST 2014
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <gflags/gflags.h>
#include "butil/atomicops.h" #include "butil/atomicops.h"
#include "butil/time.h" #include "butil/time.h"
#include "butil/macros.h" #include "butil/macros.h"
...@@ -11,6 +12,11 @@ ...@@ -11,6 +12,11 @@
#include <bthread/butex.h> #include <bthread/butex.h>
#include "butil/logging.h" #include "butil/logging.h"
#include "bthread/bthread.h" #include "bthread/bthread.h"
#include "bthread/task_control.h"
namespace bthread {
extern TaskControl* g_task_control;
}
namespace { namespace {
void* dummy(void*) { void* dummy(void*) {
...@@ -105,4 +111,71 @@ TEST(BthreadTest, setconcurrency_with_running_bthread) { ...@@ -105,4 +111,71 @@ TEST(BthreadTest, setconcurrency_with_running_bthread) {
//ASSERT_EQ(N, npthreads); //ASSERT_EQ(N, npthreads);
LOG(INFO) << "Touched pthreads=" << npthreads; LOG(INFO) << "Touched pthreads=" << npthreads;
} }
void* sleep_proc(void*) {
usleep(100000);
return NULL;
}
void* add_concurrency_proc(void*) {
bthread_t tid;
bthread_start_background(&tid, &BTHREAD_ATTR_SMALL, sleep_proc, NULL);
bthread_join(tid, NULL);
return NULL;
}
bool set_min_concurrency(int num) {
std::stringstream ss;
ss << num;
std::string ret = google::SetCommandLineOption("bthread_min_concurrency", ss.str().c_str());
return !ret.empty();
}
int get_min_concurrency() {
std::string ret;
google::GetCommandLineOption("bthread_min_concurrency", &ret);
return atoi(ret.c_str());
}
TEST(BthreadTest, min_concurrency) {
ASSERT_EQ(1, set_min_concurrency(-1)); // set min success
ASSERT_EQ(1, set_min_concurrency(0)); // set min success
ASSERT_EQ(0, get_min_concurrency());
int conn = bthread_getconcurrency();
int add_conn = 100;
ASSERT_EQ(0, set_min_concurrency(conn + 1)); // set min failed
ASSERT_EQ(0, get_min_concurrency());
ASSERT_EQ(1, set_min_concurrency(conn - 1)); // set min success
ASSERT_EQ(conn - 1, get_min_concurrency());
ASSERT_EQ(EINVAL, bthread_setconcurrency(conn - 2)); // set max failed
ASSERT_EQ(0, bthread_setconcurrency(conn + add_conn + 1)); // set max success
ASSERT_EQ(0, bthread_setconcurrency(conn + add_conn)); // set max success
ASSERT_EQ(conn + add_conn, bthread_getconcurrency());
ASSERT_EQ(conn, bthread::g_task_control->concurrency());
ASSERT_EQ(1, set_min_concurrency(conn + 1)); // set min success
ASSERT_EQ(conn + 1, get_min_concurrency());
ASSERT_EQ(conn + 1, bthread::g_task_control->concurrency());
std::vector<bthread_t> tids;
for (int i = 0; i < conn; ++i) {
bthread_t tid;
bthread_start_background(&tid, &BTHREAD_ATTR_SMALL, sleep_proc, NULL);
tids.push_back(tid);
}
for (int i = 0; i < add_conn; ++i) {
bthread_t tid;
bthread_start_background(&tid, &BTHREAD_ATTR_SMALL, add_concurrency_proc, NULL);
tids.push_back(tid);
}
for (size_t i = 0; i < tids.size(); ++i) {
bthread_join(tids[i], NULL);
}
ASSERT_EQ(conn + add_conn, bthread_getconcurrency());
ASSERT_EQ(conn + add_conn, bthread::g_task_control->concurrency());
}
} // namespace } // namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment