Commit 86222574 authored by TousakaRin's avatar TousakaRin

Make variable naming and gflags more standardized and easier to understand

parent 9a0a0395
...@@ -22,36 +22,35 @@ ...@@ -22,36 +22,35 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
DEFINE_int32(auto_cl_peak_qps_window_size, 50, DEFINE_int32(auto_cl_sample_window_size_ms, 1000, "Duration of the sampling window.");
"The number of samples windows used for peak-qps calculations.");
DEFINE_double(auto_cl_sampling_interval_ms, 0.1,
"Interval for sampling request in auto concurrency limiter");
DEFINE_int32(auto_cl_sample_window_size_ms, 1000,
"Sample window size for update max concurrency in grandient "
"concurrency limiter");
DEFINE_int32(auto_cl_min_sample_count, 100, DEFINE_int32(auto_cl_min_sample_count, 100,
"Minimum sample count for update max concurrency"); "During the duration of the sampling window, if the number of "
"requests collected is less than this value, the sampling window "
"will be discarded.");
DEFINE_int32(auto_cl_max_sample_count, 500, DEFINE_int32(auto_cl_max_sample_count, 500,
"Maximum sample count for update max concurrency"); "During the duration of the sampling window, once the number of "
"requests collected is greater than this value, even if the "
"duration of the window has not ended, the max_concurrency will "
"be updated and a new sampling window will be started.");
DEFINE_int32(auto_cl_initial_max_concurrency, 40, DEFINE_int32(auto_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter"); "Initial max concurrency for grandient concurrency limiter");
DEFINE_int32(auto_cl_reset_interval_ms, 50000, DEFINE_int32(auto_cl_noload_latency_remeasure_interval_ms, 50000,
"Interval for remeasurement of noload_latency. The period of " "Interval for remeasurement of noload_latency. In the period of "
"remeasurement of noload_latency will halve max_concurrency."); "remeasurement of noload_latency will halve max_concurrency.");
DEFINE_int32(auto_cl_reset_duration_ms, 2000, DEFINE_int32(auto_cl_noload_latency_remeasure_period_ms, 2000,
"The duration of the remeasurement of noload_latency."); "The duration of the remeasurement of noload_latency.Note that "
DEFINE_int32(auto_cl_min_concurrency, 40, "Minimum value of max_concurrency"); "in the period of remeasurement, max_concurrency will be halved");
DEFINE_double(auto_cl_sampling_interval_ms, 0.1,
DEFINE_double(auto_cl_adjust_smooth, 0.9, "Interval for sampling request in auto concurrency limiter");
"Smooth coefficient for adjust the max concurrency, the value " DEFINE_double(auto_cl_alpha_factor_for_ema, 0.1,
"is 0-1, the larger the value, the smaller the amount of each " "The smoothing coefficient used in the calculation of ema, "
"change"); "the value range is 0-1. The smaller the value, the smaller "
"the effect of a single sample_window on max_concurrency.");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0, DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger " "Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy."); "the configuration item, the more aggressive the penalty strategy.");
DEFINE_double(auto_cl_overload_threshold, 0.40, DEFINE_double(auto_cl_overload_threshold, 0.3,
"Expected ratio of latency fluctuations"); "Expected ratio of latency fluctuations");
DEFINE_bool(auto_cl_enable_error_punish, true, DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency"); "Whether to consider failed requests when calculating maximum concurrency");
...@@ -60,12 +59,11 @@ static int32_t cast_max_concurrency(void* arg) { ...@@ -60,12 +59,11 @@ static int32_t cast_max_concurrency(void* arg) {
} }
AutoConcurrencyLimiter::AutoConcurrencyLimiter() AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _reset_start_us(NextResetTime()) : _reset_start_us(NextResetTime(butil::gettimeofday_us()))
, _reset_end_us(0) , _reset_end_us(0)
, _min_latency_us(-1) , _min_latency_us(-1)
, _ema_peak_qps(-1) , _ema_peak_qps(-1)
, _qps_bq(FLAGS_auto_cl_peak_qps_window_size) , _ema_factor(FLAGS_auto_cl_alpha_factor_for_ema)
, _smooth(FLAGS_auto_cl_adjust_smooth)
, _overload_threshold(FLAGS_auto_cl_overload_threshold) , _overload_threshold(FLAGS_auto_cl_overload_threshold)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency) , _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
...@@ -127,10 +125,10 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { ...@@ -127,10 +125,10 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
} }
} }
int64_t AutoConcurrencyLimiter::NextResetTime() { int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = butil::gettimeofday_us() + int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_reset_interval_ms / 2 + (FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_reset_interval_ms / 2)) * 1000; butil::fast_rand_less_than(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2)) * 1000;
return reset_start_us; return reset_start_us;
} }
...@@ -185,24 +183,29 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { ...@@ -185,24 +183,29 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) { if (_min_latency_us <= 0) {
_min_latency_us = latency_us; _min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) { } else if (latency_us < _min_latency_us) {
_min_latency_us = _min_latency_us * _smooth + latency_us * (1 - _smooth); _min_latency_us = latency_us * _ema_factor + _min_latency_us * (1 - _ema_factor);
} }
} }
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count, void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) { int64_t sampling_time_us) {
while (!_qps_deque.empty() &&
sampling_time_us - _qps_deque.front().first >=
FLAGS_auto_cl_noload_latency_remeasure_interval_ms * 1000) {
_qps_deque.pop_front();
}
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us); double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
_qps_bq.elim_push(qps); _qps_deque.push_back(std::make_pair(sampling_time_us, qps));
double peak_qps = *(_qps_bq.bottom()); double peak_qps = 0;
for (size_t i = 0; i < _qps_bq.size(); ++i) { for (auto history_qps : _qps_deque) {
peak_qps = std::max(*(_qps_bq.bottom(i)), peak_qps); peak_qps = std::max(peak_qps, history_qps.second);
} }
if (peak_qps >= _ema_peak_qps) { if (peak_qps >= _ema_peak_qps) {
_ema_peak_qps = peak_qps; _ema_peak_qps = peak_qps;
} else { } else {
_ema_peak_qps = _ema_peak_qps * _smooth + peak_qps * (1 - _smooth); _ema_peak_qps = peak_qps * _ema_factor + _ema_peak_qps * (1 - _ema_factor);
} }
} }
...@@ -223,8 +226,8 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -223,8 +226,8 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int next_max_concurrency = 0; int next_max_concurrency = 0;
if (_reset_start_us <= sampling_time_us) { if (_reset_start_us <= sampling_time_us) {
_min_latency_us = -1; _min_latency_us = -1;
_reset_start_us = NextResetTime(); _reset_start_us = NextResetTime(sampling_time_us);
_reset_end_us = sampling_time_us + FLAGS_auto_cl_reset_duration_ms * 1000; _reset_end_us = sampling_time_us + FLAGS_auto_cl_noload_latency_remeasure_period_ms * 1000;
next_max_concurrency = _max_concurrency / 2; next_max_concurrency = _max_concurrency / 2;
} else { } else {
int32_t noload_concurrency = int32_t noload_concurrency =
...@@ -236,8 +239,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -236,8 +239,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
next_max_concurrency = noload_concurrency; next_max_concurrency = noload_concurrency;
} }
} }
next_max_concurrency =
std::max(next_max_concurrency, FLAGS_auto_cl_min_concurrency);
if (next_max_concurrency != _max_concurrency) { if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency; _max_concurrency = next_max_concurrency;
......
...@@ -51,7 +51,7 @@ private: ...@@ -51,7 +51,7 @@ private:
}; };
int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us); int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(); int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called // The following methods are not thread safe and can only be called
// in AppSample() // in AppSample()
...@@ -66,9 +66,9 @@ private: ...@@ -66,9 +66,9 @@ private:
int64_t _reset_end_us; int64_t _reset_end_us;
int64_t _min_latency_us; int64_t _min_latency_us;
double _ema_peak_qps; double _ema_peak_qps;
butil::BoundedQueue<double> _qps_bq; std::deque<std::pair<int64_t, double>> _qps_deque;
const double _smooth; const double _ema_factor;
const double _overload_threshold; const double _overload_threshold;
butil::Mutex _sw_mutex; butil::Mutex _sw_mutex;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment