Commit 0410cbcf authored by TousakaRin's avatar TousakaRin

Modify the calculation formula of max_concurrency

parent 9d5ca591
......@@ -22,7 +22,8 @@
namespace brpc {
namespace policy {
DEFINE_int32(auto_cl_peak_qps_window_size, 30, "");
DEFINE_int32(auto_cl_peak_qps_window_size, 50,
"The number of samples windows used for peak-qps calculations.");
DEFINE_int32(auto_cl_sampling_interval_us, 100,
"Interval for sampling request in auto concurrency limiter");
DEFINE_int32(auto_cl_sample_window_size_ms, 1000,
......@@ -30,32 +31,42 @@ DEFINE_int32(auto_cl_sample_window_size_ms, 1000,
"concurrency limiter");
DEFINE_int32(auto_cl_min_sample_count, 100,
"Minimum sample count for update max concurrency");
DEFINE_double(auto_cl_adjust_smooth, 0.9,
"Smooth coefficient for adjust the max concurrency, the value is 0-1,"
"the larger the value, the smaller the amount of each change");
DEFINE_int32(auto_cl_max_sample_count, 500,
"Maximum sample count for update max concurrency");
DEFINE_int32(auto_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter");
DEFINE_int32(auto_cl_reset_interval_ms, 50000,
"Interval for remeasurement of noload_latency. The period of "
"remeasurement of noload_latency will halve max_concurrency.");
DEFINE_int32(auto_cl_reset_duration_ms, 2000,
"The duration of the remeasurement of noload_latency.");
DEFINE_int32(auto_cl_min_concurrency, 40, "Minimum value of max_concurrency");
DEFINE_double(auto_cl_adjust_smooth, 0.9,
"Smooth coefficient for adjust the max concurrency, the value "
"is 0-1, the larger the value, the smaller the amount of each "
"change");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
DEFINE_double(auto_cl_overload_threshold, 0.40,
"Expected ratio of latency fluctuations");
DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger the "
"configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(auto_cl_min_reserved_concurrency, 10, "");
DEFINE_int32(auto_cl_reset_count, 30,
"The service's latency will be re-measured every `reset_count' windows.");
DEFINE_double(auto_cl_latency_fluctuate_rate, 0.4, "");
static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg;
}
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _reset_count(NextResetCount())
: _reset_start_ms(NextResetTime())
, _reset_end_ms(0)
, _min_latency_us(-1)
, _smooth(FLAGS_auto_cl_adjust_smooth)
, _ema_peak_qps(-1)
, _rest_noload_count(0)
, _qps_bq(FLAGS_auto_cl_peak_qps_window_size)
, _smooth(FLAGS_auto_cl_adjust_smooth)
, _overload_threshold(FLAGS_auto_cl_overload_threshold)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _total_succ_req(0)
......@@ -110,15 +121,17 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
if (max_concurrency != 0) {
LOG(INFO)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency;
<< "current_max_concurrency:" << max_concurrency << " " << _min_latency_us;
}
}
}
}
int AutoConcurrencyLimiter::NextResetCount() {
int max_reset_count = FLAGS_auto_cl_reset_count;
return butil::fast_rand_less_than(max_reset_count / 2) + max_reset_count / 2;
int64_t AutoConcurrencyLimiter::NextResetTime() {
int64_t reset_start_ms = butil::gettimeofday_ms() +
FLAGS_auto_cl_reset_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_reset_interval_ms / 2);
return reset_start_ms;
}
int32_t AutoConcurrencyLimiter::AddSample(int error_code,
......@@ -138,10 +151,12 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
_sw.total_succ_us += latency_us;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 ||
if (_sw.succ_count + _sw.failed_count <
FLAGS_auto_cl_min_sample_count ||
(sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms &&
_sw.succ_count + _sw.failed_count <
FLAGS_auto_cl_min_sample_count) {
FLAGS_auto_cl_max_sample_count)) {
return 0;
}
......@@ -184,7 +199,12 @@ void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
for (size_t i = 0; i < _qps_bq.size(); ++i) {
peak_qps = std::max(*(_qps_bq.bottom(i)), peak_qps);
}
if (peak_qps >= _ema_peak_qps) {
_ema_peak_qps = peak_qps;
} else {
_ema_peak_qps = _ema_peak_qps * _smooth + peak_qps * (1 - _smooth);
}
}
int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
......@@ -197,30 +217,43 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
if (_rest_noload_count > 0) {
--_rest_noload_count;
if (_reset_end_ms > sampling_time_us / 1000) {
return 0;
}
int next_max_concurrency = 0;
if (--_reset_count == 0) {
if (_reset_start_ms <= sampling_time_us / 1000) {
_min_latency_us = -1;
_reset_count = NextResetCount();
_rest_noload_count = std::ceil(
double(avg_latency) / FLAGS_auto_cl_sample_window_size_ms / 1000);
_reset_start_ms = NextResetTime();
_reset_end_ms = sampling_time_us/1000 + FLAGS_auto_cl_reset_duration_ms;
next_max_concurrency = _max_concurrency / 2;
} else {
int32_t noload_concurrency = _ema_peak_qps * _min_latency_us / 1000000.0;
if (avg_latency > (1 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us) {
next_max_concurrency = noload_concurrency;
int32_t noload_concurrency = std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
if (avg_latency < (1.0 + _overload_threshold) * _min_latency_us) {
// LOG(INFO) << "<<<" << _min_latency_us << " " << avg_latency << " " << _ema_peak_qps << " " << noload_concurrency;
next_max_concurrency = std::ceil(noload_concurrency * (2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
} else {
next_max_concurrency =
std::ceil(_ema_peak_qps * ((2 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us - avg_latency) / 1000000.0 );
if (avg_latency <= (1 + FLAGS_auto_cl_latency_fluctuate_rate) * _min_latency_us) {
next_max_concurrency = std::max(next_max_concurrency, noload_concurrency + FLAGS_auto_cl_min_reserved_concurrency);
}
// LOG(INFO) << ">>>" << _min_latency_us << " " << avg_latency << " " << _ema_peak_qps << " " << noload_concurrency;
next_max_concurrency = noload_concurrency;
}
// if (avg_latency > (1.0 + _overload_threshold) * _min_latency_us) {
// next_max_concurrency = noload_concurrency;
// // LOG(INFO) << " >>> " << next_max_concurrency << " " << _min_latency_us / 1000;
// } else if (avg_latency < (1.0 + _noload_threshold) * _min_latency_us) {
// next_max_concurrency =
// noload_concurrency * (1.0 + _overload_threshold);
// // LOG(INFO) << " <<< " << next_max_concurrency << " " << _min_latency_us / 1000;
// } else {
// next_max_concurrency =
// noload_concurrency * (2.0 + _overload_threshold -
// double(avg_latency) / _min_latency_us);
// // LOG(INFO) << " --- " << next_max_concurrency << " " << _min_latency_us / 1000;
// }
}
next_max_concurrency =
std::max(next_max_concurrency, FLAGS_auto_cl_min_concurrency);
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
......
......@@ -51,7 +51,7 @@ private:
};
int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int NextResetCount();
int64_t NextResetTime();
// The following methods are not thread safe and can only be called
// in AppSample()
......@@ -62,12 +62,15 @@ private:
double peak_qps();
SampleWindow _sw;
int _reset_count;
int64_t _reset_start_ms;
int64_t _reset_end_ms;
int64_t _min_latency_us;
const double _smooth;
double _ema_peak_qps;
int _rest_noload_count;
butil::BoundedQueue<double> _qps_bq;
const double _smooth;
const double _overload_threshold;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
......
......@@ -876,7 +876,7 @@ int Server::StartInternal(const butil::ip_t& ip,
static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency);
_cl->Expose("Global_Concurrency_Limiter");
_cl->Expose("Server_Concurrency_Limiter");
} else {
if (_cl) {
_cl->Destroy();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment