Commit 379caa1f authored by TousakaRin's avatar TousakaRin

Fix bug: units are not unified, causing window calculation errors, fix code style

parent 362c5d02
...@@ -108,13 +108,12 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { ...@@ -108,13 +108,12 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
if (last_sampling_time_us == 0 || if (last_sampling_time_us == 0 ||
now_time_us - last_sampling_time_us >= now_time_us - last_sampling_time_us >=
FLAGS_auto_cl_sampling_interval_ms * 1000) { FLAGS_auto_cl_sampling_interval_ms * 1000) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_weak( bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
butil::memory_order_relaxed);
if (sample_this_call) { if (sample_this_call) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us); int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) { if (max_concurrency != 0) {
LOG(INFO) LOG_EVERY_N(INFO, 100)
<< "MaxConcurrency updated by auto limiter," << "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency; << "current_max_concurrency:" << max_concurrency;
} }
...@@ -125,20 +124,19 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { ...@@ -125,20 +124,19 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) { int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us + int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 + (FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2)) * 1000; butil::fast_rand_less_than(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2)) * 1000;
return reset_start_us; return reset_start_us;
} }
int32_t AutoConcurrencyLimiter::AddSample(int error_code, int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us, int64_t latency_us,
int64_t sampling_time_us) { int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex); std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
if (_sw.start_time_us == 0) { if (_sw.start_time_us == 0) {
_sw.start_time_us = sampling_time_us; _sw.start_time_us = sampling_time_us;
} }
if (error_code != 0 && if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
FLAGS_auto_cl_enable_error_punish) {
++_sw.failed_count; ++_sw.failed_count;
_sw.total_failed_us += latency_us; _sw.total_failed_us += latency_us;
} else if (error_code == 0) { } else if (error_code == 0) {
...@@ -147,9 +145,16 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code, ...@@ -147,9 +145,16 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
} }
if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) { if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
if (sampling_time_us - _sw.sampling_time_us >=
FLAGS_auto_cl_sample_window_size_ms * 1000) {
// If the sample size is insufficient at the end of the sampling
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return 0; return 0;
} }
if (sampling_time_us - _sw.start_time_us < FLAGS_auto_cl_sample_window_size_ms && if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) { _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return 0; return 0;
} }
...@@ -221,7 +226,7 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -221,7 +226,7 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
return 0; return 0;
} }
// Remeasure min_latency when concurrency has dropped to low load // Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0 && _reset_latency_us < sampling_time_us) { if (_reset_latency_us > 0 && _reset_latency_us <= sampling_time_us) {
_min_latency_us = -1; _min_latency_us = -1;
_reset_latency_us = 0; _reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us); _remeasure_start_us = NextResetTime(sampling_time_us);
...@@ -233,7 +238,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { ...@@ -233,7 +238,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
if (_remeasure_start_us <= sampling_time_us) { if (_remeasure_start_us <= sampling_time_us) {
_reset_latency_us = sampling_time_us + avg_latency; _reset_latency_us = sampling_time_us + avg_latency;
next_max_concurrency = _max_concurrency / 2; next_max_concurrency = _max_concurrency / 2;
LOG(INFO) << "Prepare" << _max_concurrency;
} else { } else {
int32_t noload_concurrency = int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000); std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment