Commit 6ae39ecd authored by TousakaRin's avatar TousakaRin

Add gflag :auto_cl_min_explore_ratio, fix code style

parent b0854df1
...@@ -48,20 +48,25 @@ DEFINE_bool(auto_cl_enable_error_punish, true, ...@@ -48,20 +48,25 @@ DEFINE_bool(auto_cl_enable_error_punish, true,
DEFINE_double(auto_cl_fail_punish_ratio, 1.0, DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger " "Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy."); "the configuration item, the more aggressive the penalty strategy.");
DEFINE_double(auto_cl_max_reserved_ratio, 0.3, DEFINE_double(auto_cl_max_explore_ratio, 0.3,
"The larger the value, the higher the tolerance of the server to " "The larger the value, the higher the tolerance of the server to "
"the fluctuation of latency at low load, and the the greater the " "the fluctuation of latency at low load, and the the greater the "
"maximum growth rate of qps. Correspondingly, the server will have " "maximum growth rate of qps. Correspondingly, the server will have "
"a higher latency for a short period of time after the overload."); "a higher latency for a short period of time after the overload.");
DEFINE_double(auto_cl_change_rate_of_reserved_ratio, 0.01, DEFINE_double(auto_cl_min_explore_ratio, 0.06,
"The speed of change of auto_cl_max_reserved_ratio when the " "Auto concurrency limiter will perform fault tolerance based on "
"this parameter when judging the load situation of the server. "
"It should be a positive value close to 0, the larger it is, "
"the higher the latency of the server at full load.");
DEFINE_double(auto_cl_change_rate_of_explore_ratio, 0.02,
"The speed of change of auto_cl_max_explore_ratio when the "
"load situation of the server changes, The value range is " "load situation of the server changes, The value range is "
"(0 - `max_reserved_ratio')"); "(0 - `max_explore_ratio')");
DEFINE_double(auto_cl_reduce_ratio_while_remeasure, 0.9, DEFINE_double(auto_cl_reduce_ratio_while_remeasure, 0.9,
"This value affects the reduction ratio to mc during retesting " "This value affects the reduction ratio to mc during retesting "
"noload_latency. The value range is (0-1)"); "noload_latency. The value range is (0-1)");
DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1, DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
"Affect the judgment of the server's load situation. The larger " "Affect the judgement of the server's load situation. The larger "
"the value, the higher the tolerance for the fluctuation of the " "the value, the higher the tolerance for the fluctuation of the "
"latency. If the value is too large, the latency will be higher " "latency. If the value is too large, the latency will be higher "
"when the server is overloaded."); "when the server is overloaded.");
...@@ -72,7 +77,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter() ...@@ -72,7 +77,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter()
, _reset_latency_us(0) , _reset_latency_us(0)
, _min_latency_us(-1) , _min_latency_us(-1)
, _ema_max_qps(-1) , _ema_max_qps(-1)
, _reserved_ratio(FLAGS_auto_cl_max_reserved_ratio) , _explore_ratio(FLAGS_auto_cl_max_explore_ratio)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
, _total_succ_req(0) { , _total_succ_req(0) {
} }
...@@ -102,7 +107,18 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { ...@@ -102,7 +107,18 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong( bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed); last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) { if (sample_this_call) {
AddSample(error_code, latency_us, now_time_us); bool sample_window_submitted = AddSample(error_code, latency_us,
now_time_us);
if (sample_window_submitted) {
// The following log prints has data-race in extreme cases,
// unless you are in debug, you should not open it.
VLOG(1)
<< "Sample window submitted, current max_concurrency:"
<< _max_concurrency
<< ", min_latency_us:" << _min_latency_us
<< ", ema_max_qps:" << _ema_max_qps
<< ", explore_ratio:" << _explore_ratio;
}
} }
} }
} }
...@@ -118,7 +134,7 @@ int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) { ...@@ -118,7 +134,7 @@ int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
return reset_start_us; return reset_start_us;
} }
void AutoConcurrencyLimiter::AddSample(int error_code, bool AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us, int64_t latency_us,
int64_t sampling_time_us) { int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex); std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
...@@ -126,7 +142,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code, ...@@ -126,7 +142,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
// min_latency is about to be reset soon. // min_latency is about to be reset soon.
if (_reset_latency_us > sampling_time_us) { if (_reset_latency_us > sampling_time_us) {
// ignoring samples during waiting for the deadline. // ignoring samples during waiting for the deadline.
return; return false;
} }
// Remeasure min_latency when concurrency has dropped to low load // Remeasure min_latency when concurrency has dropped to low load
_min_latency_us = -1; _min_latency_us = -1;
...@@ -154,12 +170,12 @@ void AutoConcurrencyLimiter::AddSample(int error_code, ...@@ -154,12 +170,12 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
// window, discard the entire sampling window // window, discard the entire sampling window
ResetSampleWindow(sampling_time_us); ResetSampleWindow(sampling_time_us);
} }
return; return false;
} }
if (sampling_time_us - _sw.start_time_us < if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 && FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) { _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return; return false;
} }
if(_sw.succ_count > 0) { if(_sw.succ_count > 0) {
...@@ -169,6 +185,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code, ...@@ -169,6 +185,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
_max_concurrency /= 2; _max_concurrency /= 2;
} }
ResetSampleWindow(sampling_time_us); ResetSampleWindow(sampling_time_us);
return true;
} }
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
...@@ -215,18 +232,18 @@ void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -215,18 +232,18 @@ void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
next_max_concurrency = next_max_concurrency =
std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio); std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
} else { } else {
const double epsilon = 0.05; const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
const double change_step = FLAGS_auto_cl_change_rate_of_reserved_ratio; const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
const double max_reserved_ratio = FLAGS_auto_cl_max_reserved_ratio; const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor; const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
if (avg_latency <= _min_latency_us * (1.0 + epsilon * correction_factor) || if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
qps <= _ema_max_qps / (1.0 + epsilon)) { qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
_reserved_ratio = std::min(max_reserved_ratio, _reserved_ratio + change_step); _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
} else { } else {
_reserved_ratio = std::max(epsilon, _reserved_ratio - change_step); _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
} }
next_max_concurrency = next_max_concurrency =
_min_latency_us * _ema_max_qps / 1000000 * (1 + _reserved_ratio); _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
} }
if (next_max_concurrency != _max_concurrency) { if (next_max_concurrency != _max_concurrency) {
......
...@@ -51,7 +51,7 @@ private: ...@@ -51,7 +51,7 @@ private:
int64_t total_succ_us; int64_t total_succ_us;
}; };
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us); bool AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(int64_t sampling_time_us); int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called // The following methods are not thread safe and can only be called
...@@ -67,7 +67,7 @@ private: ...@@ -67,7 +67,7 @@ private:
int64_t _reset_latency_us; int64_t _reset_latency_us;
int64_t _min_latency_us; int64_t _min_latency_us;
double _ema_max_qps; double _ema_max_qps;
double _reserved_ratio; double _explore_ratio;
// modified per sample. // modified per sample.
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us; butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment