Unverified Commit 33538c6b authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #459 from TousakaRin/auto_concurrency_limiter

Auto concurrency limiter
parents 05227912 34f38860
--case_file=test_case_test
#client settings
--case_file=test_case.json
--client_qps_change_interval_us=50000
--max_retry=0
--auto_cl_overload_threshold=0.3
--auto_cl_initial_max_concurrency=16
#auto_cl settings
--auto_cl_initial_max_concurrency=40
--auto_cl_max_explore_ratio=0.3
--auto_cl_min_explore_ratio=0.06
--auto_cl_change_rate_of_explore_ratio=0.02
--auto_cl_reduce_ratio_while_remeasure=0.9
--auto_cl_latency_fluctuation_correction_factor=2
#server setings for async sleep
--latency_change_interval_us=50000
--server_bthread_concurrency=4
--server_sync_sleep_us=2500
--use_usleep=false
#server setings for sync sleep
#--latency_change_interval_us=50000
#--server_bthread_concurrency=16
#--server_max_concurrency=15
#--server_sync_sleep_us=2500
#--use_usleep=true
--case_file=test_case_test
--client_qps_change_interval_us=50000
--max_retry=0
--auto_cl_overload_threshold=0.3
--auto_cl_initial_max_concurrency=16
--latency_change_interval_us=50000
--server_bthread_concurrency=16
--server_max_concurrency=15
--server_sync_sleep_us=2500
--use_usleep=true
......@@ -29,12 +29,6 @@
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":1500,
"duration_sec":10,
"type":2
},
{
"lower_bound":1500,
"upper_bound":1500,
......@@ -59,12 +53,6 @@
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":300,
"duration_sec":10,
"type":2
},
{
"lower_bound":300,
"upper_bound":1800,
......@@ -89,12 +77,6 @@
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":3000,
"duration_sec":20,
"type":2
},
{
"lower_bound":3000,
"upper_bound":3000,
......@@ -118,12 +100,6 @@
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":3000,
"duration_sec":20,
"type":2
},
{
"lower_bound":3000,
"upper_bound":3000,
......@@ -213,70 +189,65 @@
]
},
{
"case_name":"qps_fluctuate_noload, latency_fluctuate_noload",
"case_name":"qps_stable_noload, latency_leap_raise",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":300,
"duration_sec":10,
"lower_bound":300,
"upper_bound":1800,
"duration_sec":20,
"type":2
},
{
"lower_bound":300,
"lower_bound":1800,
"upper_bound":1800,
"duration_sec":190,
"type":1
"duration_sec":220,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":30000,
"duration_sec":100,
"type":2
},
{
"lower_bound":50000,
"upper_bound":50000,
"duration_sec":200,
"type":1
"duration_sec":100,
"type":2
}
]
},
{
"case_name":"qps_stable_noload, latency_leap_raise",
"case_name":"qps_fluctuate_noload, latency_fluctuate_noload",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":300,
"upper_bound":1800,
"duration_sec":20,
"type":2
},
{
"lower_bound":1800,
"upper_bound":1800,
"duration_sec":220,
"type":2
"duration_sec":190,
"type":1
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":30000,
"duration_sec":100,
"type":2
},
{
"lower_bound":50000,
"upper_bound":50000,
"duration_sec":100,
"type":2
"duration_sec":200,
"type":1
}
]
}
]}
......@@ -43,13 +43,33 @@ DEFINE_double(auto_cl_alpha_factor_for_ema, 0.1,
"The smoothing coefficient used in the calculation of ema, "
"the value range is 0-1. The smaller the value, the smaller "
"the effect of a single sample_window on max_concurrency.");
DEFINE_double(auto_cl_overload_threshold, 0.3,
"Expected ratio of latency fluctuations");
DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
DEFINE_double(auto_cl_max_explore_ratio, 0.3,
"The larger the value, the higher the tolerance of the server to "
"the fluctuation of latency at low load, and the the greater the "
"maximum growth rate of qps. Correspondingly, the server will have "
"a higher latency for a short period of time after the overload.");
DEFINE_double(auto_cl_min_explore_ratio, 0.06,
"Auto concurrency limiter will perform fault tolerance based on "
"this parameter when judging the load situation of the server. "
"It should be a positive value close to 0, the larger it is, "
"the higher the latency of the server at full load.");
DEFINE_double(auto_cl_change_rate_of_explore_ratio, 0.02,
"The speed of change of auto_cl_max_explore_ratio when the "
"load situation of the server changes, The value range is "
"(0 - `max_explore_ratio')");
DEFINE_double(auto_cl_reduce_ratio_while_remeasure, 0.9,
"This value affects the reduction ratio to mc during retesting "
"noload_latency. The value range is (0-1)");
DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
"Affect the judgement of the server's load situation. The larger "
"the value, the higher the tolerance for the fluctuation of the "
"latency. If the value is too large, the latency will be higher "
"when the server is overloaded.");
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
......@@ -57,6 +77,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter()
, _reset_latency_us(0)
, _min_latency_us(-1)
, _ema_max_qps(-1)
, _explore_ratio(FLAGS_auto_cl_max_explore_ratio)
, _last_sampling_time_us(0)
, _total_succ_req(0) {
}
......@@ -86,7 +107,18 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) {
AddSample(error_code, latency_us, now_time_us);
bool sample_window_submitted = AddSample(error_code, latency_us,
now_time_us);
if (sample_window_submitted) {
// The following log prints has data-race in extreme cases,
// unless you are in debug, you should not open it.
VLOG(1)
<< "Sample window submitted, current max_concurrency:"
<< _max_concurrency
<< ", min_latency_us:" << _min_latency_us
<< ", ema_max_qps:" << _ema_max_qps
<< ", explore_ratio:" << _explore_ratio;
}
}
}
}
......@@ -102,7 +134,7 @@ int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
return reset_start_us;
}
void AutoConcurrencyLimiter::AddSample(int error_code,
bool AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
......@@ -110,7 +142,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
// min_latency is about to be reset soon.
if (_reset_latency_us > sampling_time_us) {
// ignoring samples during waiting for the deadline.
return;
return false;
}
// Remeasure min_latency when concurrency has dropped to low load
_min_latency_us = -1;
......@@ -138,12 +170,12 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return;
return false;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return;
return false;
}
if(_sw.succ_count > 0) {
......@@ -153,9 +185,11 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
_max_concurrency /= 2;
}
ResetSampleWindow(sampling_time_us);
return true;
}
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_total_succ_req.exchange(0, butil::memory_order_relaxed);
_sw.start_time_us = sampling_time_us;
_sw.succ_count = 0;
_sw.failed_count = 0;
......@@ -172,9 +206,7 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
}
}
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
void AutoConcurrencyLimiter::UpdateQps(double qps) {
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
if (qps >= _ema_max_qps) {
_ema_max_qps = qps;
......@@ -184,30 +216,34 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
}
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish =
_sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
UpdateQps(qps);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 3 / 4;
next_max_concurrency =
std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
} else {
const double overload_threshold = FLAGS_auto_cl_overload_threshold;
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_max_qps / 1000000);
if (avg_latency < (1.0 + overload_threshold) * _min_latency_us) {
next_max_concurrency = std::ceil(noload_concurrency *
(2.0 + overload_threshold - double(avg_latency) / _min_latency_us));
const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
_explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
} else {
next_max_concurrency = noload_concurrency;
_explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
}
next_max_concurrency =
_min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
}
if (next_max_concurrency != _max_concurrency) {
......
......@@ -51,7 +51,7 @@ private:
int64_t total_succ_us;
};
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
bool AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called
......@@ -59,7 +59,7 @@ private:
void UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
void UpdateQps(double qps);
// modified per sample-window or more
int _max_concurrency;
......@@ -67,6 +67,7 @@ private:
int64_t _reset_latency_us;
int64_t _min_latency_us;
double _ema_max_qps;
double _explore_ratio;
// modified per sample.
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment