Commit 90f3d834 authored by TousakaRin's avatar TousakaRin

Fix inaccurate bugs in qps calculation, optimization algorithm

parent ce16cdf2
...@@ -43,13 +43,30 @@ DEFINE_double(auto_cl_alpha_factor_for_ema, 0.1, ...@@ -43,13 +43,30 @@ DEFINE_double(auto_cl_alpha_factor_for_ema, 0.1,
"The smoothing coefficient used in the calculation of ema, " "The smoothing coefficient used in the calculation of ema, "
"the value range is 0-1. The smaller the value, the smaller " "the value range is 0-1. The smaller the value, the smaller "
"the effect of a single sample_window on max_concurrency."); "the effect of a single sample_window on max_concurrency.");
DEFINE_double(auto_cl_overload_threshold, 0.3,
"Expected ratio of latency fluctuations");
DEFINE_bool(auto_cl_enable_error_punish, true, DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency"); "Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0, DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger " "Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy."); "the configuration item, the more aggressive the penalty strategy.");
DEFINE_double(auto_cl_epsilon, 0.05,
"The larger the value, the more relaxed the judgment condition "
"for full load. Correspondingly, when the server is fully loaded, "
"the latency will be higher.");
DEFINE_double(auto_cl_max_reserved_ratio, 0.3,
"The larger the value, the higher the tolerance of the service to "
"the fluctuation of latency at low load, and the higher the upper "
"limit of qps increase.");
DEFINE_double(auto_cl_change_rate_of_reserved_ratio, 0.01,
"The speed of change of auto_cl_max_reserved_ratio when the "
"load situation of the server changes, The value range is "
"(0 - `max_reserved_ratio')");
DEFINE_double(auto_cl_reduce_ratio_while_remeasure, 0.9,
"This value affects the reduction ratio to mc during retesting "
"noload_latency. The value range is (0-1)");
DEFINE_int32(auto_cl_latency_fluctuation_correction_factor, 1,
"Affect the judgment of the server's load situation. The larger "
"the value, the higher the tolerance for the fluctuation of the "
"latency, and the higher the latency at full load.");
AutoConcurrencyLimiter::AutoConcurrencyLimiter() AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency) : _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
...@@ -57,6 +74,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter() ...@@ -57,6 +74,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter()
, _reset_latency_us(0) , _reset_latency_us(0)
, _min_latency_us(-1) , _min_latency_us(-1)
, _ema_max_qps(-1) , _ema_max_qps(-1)
, _reserved_ratio(FLAGS_auto_cl_max_reserved_ratio)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
, _total_succ_req(0) { , _total_succ_req(0) {
} }
...@@ -156,6 +174,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code, ...@@ -156,6 +174,7 @@ void AutoConcurrencyLimiter::AddSample(int error_code,
} }
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_total_succ_req.exchange(0, butil::memory_order_relaxed);
_sw.start_time_us = sampling_time_us; _sw.start_time_us = sampling_time_us;
_sw.succ_count = 0; _sw.succ_count = 0;
_sw.failed_count = 0; _sw.failed_count = 0;
...@@ -172,9 +191,7 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { ...@@ -172,9 +191,7 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
} }
} }
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count, void AutoConcurrencyLimiter::UpdateQps(double qps) {
int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10; const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
if (qps >= _ema_max_qps) { if (qps >= _ema_max_qps) {
_ema_max_qps = qps; _ema_max_qps = qps;
...@@ -184,30 +201,34 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count, ...@@ -184,30 +201,34 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
} }
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
_total_succ_req.exchange(0, butil::memory_order_relaxed); double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
double failed_punish =
_sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency = int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count); std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency); UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us); UpdateQps(qps);
int next_max_concurrency = 0; int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals // Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) { if (_remeasure_start_us <= sampling_time_us) {
const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
_reset_latency_us = sampling_time_us + avg_latency * 2; _reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 3 / 4; next_max_concurrency =
std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
} else { } else {
const double overload_threshold = FLAGS_auto_cl_overload_threshold; const double epsilon = FLAGS_auto_cl_epsilon;
int32_t noload_concurrency = const double change_step = FLAGS_auto_cl_change_rate_of_reserved_ratio;
std::ceil(_min_latency_us * _ema_max_qps / 1000000); const double max_reserved_ratio = FLAGS_auto_cl_max_reserved_ratio;
if (avg_latency < (1.0 + overload_threshold) * _min_latency_us) { const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
next_max_concurrency = std::ceil(noload_concurrency * if (avg_latency <= _min_latency_us * (1.0 + epsilon * correction_factor) ||
(2.0 + overload_threshold - double(avg_latency) / _min_latency_us)); qps <= _ema_max_qps / (1.0 + epsilon)) {
_reserved_ratio = std::min(max_reserved_ratio, _reserved_ratio + change_step);
} else { } else {
next_max_concurrency = noload_concurrency; _reserved_ratio = std::max(epsilon, _reserved_ratio - change_step);
} }
next_max_concurrency = _min_latency_us * _ema_max_qps / 1000000 * \
(1 + _reserved_ratio);
} }
if (next_max_concurrency != _max_concurrency) { if (next_max_concurrency != _max_concurrency) {
......
...@@ -59,7 +59,7 @@ private: ...@@ -59,7 +59,7 @@ private:
void UpdateMaxConcurrency(int64_t sampling_time_us); void UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us); void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us); void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us); void UpdateQps(double qps);
// modified per sample-window or more // modified per sample-window or more
int _max_concurrency; int _max_concurrency;
...@@ -67,6 +67,7 @@ private: ...@@ -67,6 +67,7 @@ private:
int64_t _reset_latency_us; int64_t _reset_latency_us;
int64_t _min_latency_us; int64_t _min_latency_us;
double _ema_max_qps; double _ema_max_qps;
double _reserved_ratio;
// modified per sample. // modified per sample.
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us; butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment