Commit 9d5ca591 authored by TousakaRin's avatar TousakaRin

optimized algorithm

parent ae177d1a
...@@ -40,22 +40,10 @@ DEFINE_bool(auto_cl_enable_error_punish, true, ...@@ -40,22 +40,10 @@ DEFINE_bool(auto_cl_enable_error_punish, true,
DEFINE_double(auto_cl_fail_punish_ratio, 1.0, DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger the " "Use the failed requests to punish normal requests. The larger the "
"configuration item, the more aggressive the penalty strategy."); "configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(auto_cl_reserved_concurrency, 0, DEFINE_int32(auto_cl_min_reserved_concurrency, 10, "");
"The maximum concurrency reserved when the service is not overloaded."
"When the traffic increases, the larger the configuration item, the "
"faster the maximum concurrency grows until the server is fully loaded."
"When the value is less than or equal to 0, square root of current "
"concurrency is used.");
DEFINE_int32(auto_cl_min_reserved_concurrency, 10,
"Minimum value of reserved concurrency, When the value is less than "
"or equal to 0, the minimum value of the reserved concurrency is not "
"limited.");
DEFINE_int32(auto_cl_max_reserved_concurrency, 40,
"Maximum value of reserved concurrency, When the value is less than "
"or equal to 0, the maximum value of the reserved concurrency is not "
"limited.");
DEFINE_int32(auto_cl_reset_count, 30, DEFINE_int32(auto_cl_reset_count, 30,
"The service's latency will be re-measured every `reset_count' windows."); "The service's latency will be re-measured every `reset_count' windows.");
DEFINE_double(auto_cl_latency_fluctuate_rate, 0.4, "");
static int32_t cast_max_concurrency(void* arg) { static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg; return *(int32_t*) arg;
...@@ -66,6 +54,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter() ...@@ -66,6 +54,7 @@ AutoConcurrencyLimiter::AutoConcurrencyLimiter()
, _min_latency_us(-1) , _min_latency_us(-1)
, _smooth(FLAGS_auto_cl_adjust_smooth) , _smooth(FLAGS_auto_cl_adjust_smooth)
, _ema_peak_qps(-1) , _ema_peak_qps(-1)
, _rest_noload_count(0)
, _qps_bq(FLAGS_auto_cl_peak_qps_window_size) , _qps_bq(FLAGS_auto_cl_peak_qps_window_size)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency) , _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
...@@ -120,7 +109,7 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { ...@@ -120,7 +109,7 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us); int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) { if (max_concurrency != 0) {
LOG(INFO) LOG(INFO)
<< "MaxConcurrency updated by auto limiter:" << "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency; << "current_max_concurrency:" << max_concurrency;
} }
} }
...@@ -199,7 +188,6 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count, ...@@ -199,7 +188,6 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
} }
int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load();
int32_t total_succ_req = int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed); _total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish = double failed_punish =
...@@ -209,36 +197,29 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -209,36 +197,29 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency); UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us); UpdateQps(total_succ_req, sampling_time_us);
int reserved_concurrency = FLAGS_auto_cl_reserved_concurrency; if (_rest_noload_count > 0) {
if (reserved_concurrency <= 0) { --_rest_noload_count;
reserved_concurrency = std::ceil(std::sqrt(_max_concurrency)); return 0;
}
if (FLAGS_auto_cl_min_reserved_concurrency > 0) {
reserved_concurrency = std::max(FLAGS_auto_cl_min_reserved_concurrency,
reserved_concurrency);
}
if (FLAGS_auto_cl_max_reserved_concurrency > 0) {
reserved_concurrency = std::min(FLAGS_auto_cl_max_reserved_concurrency,
reserved_concurrency);
} }
int32_t next_max_concurrency = int next_max_concurrency = 0;
std::ceil(_ema_peak_qps * _min_latency_us / 1000000.0);
if (--_reset_count == 0) { if (--_reset_count == 0) {
_reset_count = NextResetCount();
if (current_concurrency >= _max_concurrency - 2) {
_min_latency_us = -1; _min_latency_us = -1;
next_max_concurrency -= std::sqrt(_max_concurrency); _reset_count = NextResetCount();
next_max_concurrency = _rest_noload_count = std::ceil(
std::max(next_max_concurrency, reserved_concurrency); double(avg_latency) / FLAGS_auto_cl_sample_window_size_ms / 1000);
next_max_concurrency = _max_concurrency / 2;
} else { } else {
// current_concurrency < _max_concurrency means the server is int32_t noload_concurrency = _ema_peak_qps * _min_latency_us / 1000000.0;
// not overloaded and does not need to detect noload_latency by if (avg_latency > (1 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us) {
// lowering the maximum concurrency next_max_concurrency = noload_concurrency;
next_max_concurrency += reserved_concurrency;
}
} else { } else {
next_max_concurrency += reserved_concurrency; next_max_concurrency =
std::ceil(_ema_peak_qps * ((2 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us - avg_latency) / 1000000.0 );
if (avg_latency <= (1 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us) {
next_max_concurrency = std::max(next_max_concurrency, noload_concurrency + FLAGS_auto_cl_min_reserved_concurrency);
}
}
} }
if (next_max_concurrency != _max_concurrency) { if (next_max_concurrency != _max_concurrency) {
......
...@@ -66,6 +66,7 @@ private: ...@@ -66,6 +66,7 @@ private:
int64_t _min_latency_us; int64_t _min_latency_us;
const double _smooth; const double _smooth;
double _ema_peak_qps; double _ema_peak_qps;
int _rest_noload_count;
butil::BoundedQueue<double> _qps_bq; butil::BoundedQueue<double> _qps_bq;
butil::Mutex _sw_mutex; butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar; bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment