Commit 7654d8e0 authored by TousakaRin's avatar TousakaRin

Move log print out of critical section

parent aa0371ae
...@@ -119,7 +119,13 @@ void GradientConcurrencyLimiter::OnResponded(int error_code, ...@@ -119,7 +119,13 @@ void GradientConcurrencyLimiter::OnResponded(int error_code,
last_sampling_time_us, now_time_us, last_sampling_time_us, now_time_us,
butil::memory_order_relaxed); butil::memory_order_relaxed);
if (sample_this_call) { if (sample_this_call) {
int32_t max_concurrency =
AddSample(error_code, latency_us, now_time_us); AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) {
LOG(INFO)
<< "MaxConcurrency updated by gradient limiter:"
<< "current_max_concurrency:" << max_concurrency;
}
} }
} }
} }
...@@ -129,7 +135,8 @@ int GradientConcurrencyLimiter::NextResetCount() { ...@@ -129,7 +135,8 @@ int GradientConcurrencyLimiter::NextResetCount() {
return rand() % (max_reset_count / 2) + max_reset_count / 2; return rand() % (max_reset_count / 2) + max_reset_count / 2;
} }
void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us, int32_t GradientConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) { int64_t sampling_time_us) {
BAIDU_SCOPED_LOCK(_sw_mutex); BAIDU_SCOPED_LOCK(_sw_mutex);
if (_sw.start_time_us == 0) { if (_sw.start_time_us == 0) {
...@@ -150,19 +157,23 @@ void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us, ...@@ -150,19 +157,23 @@ void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
if (sampling_time_us - _sw.start_time_us < if (sampling_time_us - _sw.start_time_us <
FLAGS_gradient_cl_sample_window_size_ms * 1000) { FLAGS_gradient_cl_sample_window_size_ms * 1000) {
return; return 0;
} else if (_sw.succ_count + _sw.failed_count < }
if (_sw.succ_count + _sw.failed_count <
FLAGS_gradient_cl_min_sample_count) { FLAGS_gradient_cl_min_sample_count) {
LOG_EVERY_N(INFO, 100) << "Insufficient sample size"; LOG_EVERY_N(INFO, 100) << "Insufficient sample count";
return 0;
} else if (_sw.succ_count > 0) { } else if (_sw.succ_count > 0) {
UpdateConcurrency(sampling_time_us); int max_concurrency = UpdateMaxConcurrency(sampling_time_us);
ResetSampleWindow(sampling_time_us); ResetSampleWindow(sampling_time_us);
return max_concurrency;
} else { } else {
LOG(ERROR) << "All request failed, resize max_concurrency"; LOG(ERROR) << "All request failed, resize max_concurrency";
int32_t current_concurrency = int32_t current_concurrency =
_current_concurrency.load(butil::memory_order_relaxed); _current_concurrency.load(butil::memory_order_relaxed);
_current_concurrency.store( _current_concurrency.store(
current_concurrency / 2, butil::memory_order_relaxed); current_concurrency / 2, butil::memory_order_relaxed);
return 0;
} }
} }
...@@ -178,7 +189,8 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { ...@@ -178,7 +189,8 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) { if (_min_latency_us <= 0) {
_min_latency_us = latency_us; _min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) { } else if (latency_us < _min_latency_us) {
_min_latency_us = _min_latency_us * _smooth + latency_us * (1 - _smooth); _min_latency_us =
_min_latency_us * _smooth + latency_us * (1 - _smooth);
} }
} }
...@@ -189,7 +201,8 @@ void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count, ...@@ -189,7 +201,8 @@ void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count,
_ema_qps = _ema_qps * _smooth + qps * (1 - _smooth); _ema_qps = _ema_qps * _smooth + qps * (1 - _smooth);
} }
void GradientConcurrencyLimiter::UpdateConcurrency(int64_t sampling_time_us) { int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(
int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load(); int32_t current_concurrency = _current_concurrency.load();
int max_concurrency = _max_concurrency.load(); int max_concurrency = _max_concurrency.load();
int32_t total_succ_req = int32_t total_succ_req =
...@@ -206,40 +219,29 @@ void GradientConcurrencyLimiter::UpdateConcurrency(int64_t sampling_time_us) { ...@@ -206,40 +219,29 @@ void GradientConcurrencyLimiter::UpdateConcurrency(int64_t sampling_time_us) {
reserved_concurrency = std::ceil(std::sqrt(max_concurrency)); reserved_concurrency = std::ceil(std::sqrt(max_concurrency));
} }
int32_t next_concurrency = int32_t next_max_concurrency =
std::ceil(_ema_qps * _min_latency_us / 1000.0 / 1000); std::ceil(_ema_qps * _min_latency_us / 1000.0 / 1000);
int32_t saved_min_latency_us = _min_latency_us;
if (--_reset_count == 0) { if (--_reset_count == 0) {
_reset_count = NextResetCount(); _reset_count = NextResetCount();
if (current_concurrency >= max_concurrency - 2) { if (current_concurrency >= max_concurrency - 2) {
_min_latency_us = -1; _min_latency_us = -1;
next_concurrency -= std::sqrt(max_concurrency); next_max_concurrency -= std::sqrt(max_concurrency);
next_concurrency = std::max(next_concurrency, reserved_concurrency); next_max_concurrency =
std::max(next_max_concurrency, reserved_concurrency);
} else { } else {
// current_concurrency < max_concurrency means the server is // current_concurrency < max_concurrency means the server is
// not overloaded and does not need to detect noload_latency by // not overloaded and does not need to detect noload_latency by
// lowering the maximum concurrency // lowering the maximum concurrency
next_concurrency += reserved_concurrency; next_max_concurrency += reserved_concurrency;
} }
} else { } else {
next_concurrency += reserved_concurrency; next_max_concurrency += reserved_concurrency;
} }
LOG(INFO) if (next_max_concurrency != max_concurrency) {
<< "Update max_concurrency by gradient limiter:" _max_concurrency.store(next_max_concurrency, butil::memory_order_relaxed);
<< " pre_max_concurrency:" << max_concurrency
<< ", min_avg_latency:" << saved_min_latency_us << "us"
<< ", reserved_concurrency:" << reserved_concurrency
<< ", sampling_avg_latency:" << avg_latency << "us"
<< ", failed_punish:" << failed_punish << "us"
<< ", ema_qps:" << _ema_qps
<< ", succ sample count" << _sw.succ_count
<< ", failed sample count" << _sw.failed_count
<< ", current_concurrency:" << current_concurrency
<< ", next_max_concurrency:" << next_concurrency;
if (next_concurrency != max_concurrency) {
_max_concurrency.store(next_concurrency, butil::memory_order_relaxed);
} }
return next_max_concurrency;
} }
} // namespace policy } // namespace policy
......
...@@ -52,12 +52,12 @@ private: ...@@ -52,12 +52,12 @@ private:
int64_t total_succ_us; int64_t total_succ_us;
}; };
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us); int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int NextResetCount(); int NextResetCount();
// The following methods are not thread safe and can only be called // The following methods are not thread safe and can only be called
// in AppSample() // in AppSample()
void UpdateConcurrency(int64_t sampling_time_us); int32_t UpdateMaxConcurrency(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us); void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us); void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us); void ResetSampleWindow(int64_t sampling_time_us);
...@@ -73,7 +73,7 @@ private: ...@@ -73,7 +73,7 @@ private:
bvar::PassiveStatus<int32_t> _max_concurrency_bvar; bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us; butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> _max_concurrency; butil::atomic<int32_t> _max_concurrency;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _total_succ_req; butil::atomic<int32_t> _total_succ_req;
butil::atomic<int32_t> _current_concurrency; butil::atomic<int32_t> _current_concurrency;
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment