Commit 9a0a0395 authored by TousakaRin's avatar TousakaRin

Unified unit: internal microseconds, flags using milliseconds

......@@ -24,7 +24,7 @@ namespace policy {
DEFINE_int32(auto_cl_peak_qps_window_size, 50,
"The number of samples windows used for peak-qps calculations.");
DEFINE_int32(auto_cl_sampling_interval_us, 100,
DEFINE_double(auto_cl_sampling_interval_ms, 0.1,
"Interval for sampling request in auto concurrency limiter");
DEFINE_int32(auto_cl_sample_window_size_ms, 1000,
"Sample window size for update max concurrency in grandient "
......@@ -60,8 +60,8 @@ static int32_t cast_max_concurrency(void* arg) {
}
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _reset_start_ms(NextResetTime())
, _reset_end_ms(0)
: _reset_start_us(NextResetTime())
, _reset_end_us(0)
, _min_latency_us(-1)
, _ema_peak_qps(-1)
, _qps_bq(FLAGS_auto_cl_peak_qps_window_size)
......@@ -112,7 +112,7 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
if (last_sampling_time_us == 0 ||
now_time_us - last_sampling_time_us >=
FLAGS_auto_cl_sampling_interval_us) {
FLAGS_auto_cl_sampling_interval_ms * 1000) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_weak(
last_sampling_time_us, now_time_us,
butil::memory_order_relaxed);
......@@ -121,17 +121,17 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
if (max_concurrency != 0) {
LOG(INFO)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency << " " << _min_latency_us;
<< "current_max_concurrency:" << max_concurrency;
}
}
}
}
int64_t AutoConcurrencyLimiter::NextResetTime() {
int64_t reset_start_ms = butil::gettimeofday_ms() +
FLAGS_auto_cl_reset_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_reset_interval_ms / 2);
return reset_start_ms;
int64_t reset_start_us = butil::gettimeofday_us() +
(FLAGS_auto_cl_reset_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_reset_interval_ms / 2)) * 1000;
return reset_start_us;
}
int32_t AutoConcurrencyLimiter::AddSample(int error_code,
......@@ -151,12 +151,11 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
_sw.total_succ_us += latency_us;
}
if (_sw.succ_count + _sw.failed_count <
FLAGS_auto_cl_min_sample_count ||
(sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms &&
_sw.succ_count + _sw.failed_count <
FLAGS_auto_cl_max_sample_count)) {
if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
return 0;
}
if (sampling_time_us - _sw.start_time_us < FLAGS_auto_cl_sample_window_size_ms &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return 0;
}
......@@ -217,40 +216,25 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
if (_reset_end_ms > sampling_time_us / 1000) {
if (_reset_end_us > sampling_time_us) {
return 0;
}
int next_max_concurrency = 0;
if (_reset_start_ms <= sampling_time_us / 1000) {
if (_reset_start_us <= sampling_time_us) {
_min_latency_us = -1;
_reset_start_ms = NextResetTime();
_reset_end_ms = sampling_time_us/1000 + FLAGS_auto_cl_reset_duration_ms;
_reset_start_us = NextResetTime();
_reset_end_us = sampling_time_us + FLAGS_auto_cl_reset_duration_ms * 1000;
next_max_concurrency = _max_concurrency / 2;
} else {
int32_t noload_concurrency = std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
if (avg_latency < (1.0 + _overload_threshold) * _min_latency_us) {
// LOG(INFO) << "<<<" << _min_latency_us << " " << avg_latency << " " << _ema_peak_qps << " " << noload_concurrency;
next_max_concurrency = std::ceil(noload_concurrency * (2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
next_max_concurrency = std::ceil(noload_concurrency *
(2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
} else {
// LOG(INFO) << ">>>" << _min_latency_us << " " << avg_latency << " " << _ema_peak_qps << " " << noload_concurrency;
next_max_concurrency = noload_concurrency;
}
// if (avg_latency > (1.0 + _overload_threshold) * _min_latency_us) {
// next_max_concurrency = noload_concurrency;
// // LOG(INFO) << " >>> " << next_max_concurrency << " " << _min_latency_us / 1000;
// } else if (avg_latency < (1.0 + _noload_threshold) * _min_latency_us) {
// next_max_concurrency =
// noload_concurrency * (1.0 + _overload_threshold);
// // LOG(INFO) << " <<< " << next_max_concurrency << " " << _min_latency_us / 1000;
// } else {
// next_max_concurrency =
// noload_concurrency * (2.0 + _overload_threshold -
// double(avg_latency) / _min_latency_us);
// // LOG(INFO) << " --- " << next_max_concurrency << " " << _min_latency_us / 1000;
// }
}
next_max_concurrency =
std::max(next_max_concurrency, FLAGS_auto_cl_min_concurrency);
......
......@@ -62,8 +62,8 @@ private:
double peak_qps();
SampleWindow _sw;
int64_t _reset_start_ms;
int64_t _reset_end_ms;
int64_t _reset_start_us;
int64_t _reset_end_us;
int64_t _min_latency_us;
double _ema_peak_qps;
butil::BoundedQueue<double> _qps_bq;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment