Commit 2f590bf9 authored by TousakaRin's avatar TousakaRin

Optimize the calculation of qps and min_latency

parent fc237e58
......@@ -113,9 +113,9 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
if (sample_this_call) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) {
LOG_EVERY_N(INFO, 100)
LOG_EVERY_N(INFO, 60)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency;
<< "current_max_concurrency:" << max_concurrency << " " << _min_latency_us;
}
}
}
......@@ -132,6 +132,19 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
// Waiting for the current concurrent decline
if (_reset_latency_us > sampling_time_us) {
return 0;
}
// Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0) {
_min_latency_us = -1;
_reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us);
ResetSampleWindow(sampling_time_us);
}
if (_sw.start_time_us == 0) {
_sw.start_time_us = sampling_time_us;
}
......@@ -165,10 +178,8 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
return max_concurrency;
} else {
// All request failed
int32_t current_concurrency =
_current_concurrency.load(butil::memory_order_relaxed);
_current_concurrency.store(
current_concurrency / 2, butil::memory_order_relaxed);
_max_concurrency /= 2;
ResetSampleWindow(sampling_time_us);
return 0;
}
}
......@@ -191,23 +202,13 @@ void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
while (!_qps_deque.empty() &&
sampling_time_us - _qps_deque.front().first >=
FLAGS_auto_cl_noload_latency_remeasure_interval_ms * 1000) {
_qps_deque.pop_front();
}
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
_qps_deque.push_back(std::make_pair(sampling_time_us, qps));
double peak_qps = 0;
for (auto history_qps : _qps_deque) {
peak_qps = std::max(peak_qps, history_qps.second);
}
if (peak_qps >= _ema_peak_qps) {
_ema_peak_qps = peak_qps;
if (qps >= _ema_peak_qps) {
_ema_peak_qps = qps;
} else {
_ema_peak_qps = peak_qps * _ema_factor + _ema_peak_qps * (1 - _ema_factor);
_ema_peak_qps =
qps * (_ema_factor / 10) + _ema_peak_qps * (1 - _ema_factor / 10);
}
}
......@@ -221,23 +222,12 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
// Waiting for the current concurrent decline
if (_reset_latency_us > sampling_time_us) {
return 0;
}
// Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0 && _reset_latency_us <= sampling_time_us) {
_min_latency_us = -1;
_reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us);
return 0;
}
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
_reset_latency_us = sampling_time_us + avg_latency;
next_max_concurrency = _max_concurrency / 2;
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 0.75;
} else {
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
......
......@@ -66,7 +66,6 @@ private:
int64_t _reset_latency_us;
int64_t _min_latency_us;
double _ema_peak_qps;
std::deque<std::pair<int64_t, double>> _qps_deque;
const double _ema_factor;
const double _overload_threshold;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment