Commit 7c65df38 authored by Ge Jun's avatar Ge Jun

Simplify AutoConcurrencyLimiter

parent 684d7e57
......@@ -51,57 +51,32 @@ DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg;
}
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _remeasure_start_us(NextResetTime(butil::gettimeofday_us()))
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
, _remeasure_start_us(NextResetTime(butil::gettimeofday_us()))
, _reset_latency_us(0)
, _min_latency_us(-1)
, _ema_peak_qps(-1)
, _ema_factor(FLAGS_auto_cl_alpha_factor_for_ema)
, _overload_threshold(FLAGS_auto_cl_overload_threshold)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _total_succ_req(0)
, _current_concurrency(0) {
_max_concurrency = FLAGS_auto_cl_initial_max_concurrency;
}
int AutoConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
if (_max_concurrency_bvar.expose_as(prefix, "auto_cl_max_concurrency") != 0) {
return -1;
}
return 0;
, _total_succ_req(0) {
}
AutoConcurrencyLimiter* AutoConcurrencyLimiter::New() const {
AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency&) const {
return new (std::nothrow) AutoConcurrencyLimiter;
}
void AutoConcurrencyLimiter::Destroy() {
delete this;
}
bool AutoConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurrency >= _max_concurrency) {
return false;
}
return true;
bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
return current_concurrency <= _max_concurrency;
}
void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_total_succ_req.fetch_add(1, butil::memory_order_relaxed);
} else if (ELIMIT == error_code) {
return;
}
int64_t now_time_us = butil::gettimeofday_us();
const int64_t now_time_us = butil::gettimeofday_us();
int64_t last_sampling_time_us =
_last_sampling_time_us.load(butil::memory_order_relaxed);
......@@ -111,17 +86,15 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) {
LOG_EVERY_N(INFO, 60)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency
<< ", min_latency_us: " << _min_latency_us;
}
AddSample(error_code, latency_us, now_time_us);
}
}
}
int AutoConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency;
}
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
......@@ -129,17 +102,17 @@ int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
return reset_start_us;
}
int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
void AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
// Waiting for the current concurrent decline
if (_reset_latency_us > sampling_time_us) {
return 0;
}
// Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0) {
if (_reset_latency_us != 0) {
// min_latency is about to be reset soon.
if (_reset_latency_us > sampling_time_us) {
// ignoring samples during waiting for the deadline.
return;
}
// Remeasure min_latency when concurrency has dropped to low load
_min_latency_us = -1;
_reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us);
......@@ -165,24 +138,21 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return 0;
return;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return 0;
return;
}
if(_sw.succ_count > 0) {
int max_concurrency = UpdateMaxConcurrency(sampling_time_us);
ResetSampleWindow(sampling_time_us);
return max_concurrency;
UpdateMaxConcurrency(sampling_time_us);
} else {
// All request failed
_max_concurrency /= 2;
ResetSampleWindow(sampling_time_us);
return 0;
}
ResetSampleWindow(sampling_time_us);
}
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
......@@ -194,26 +164,26 @@ void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
}
void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
if (_min_latency_us <= 0) {
_min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) {
_min_latency_us = latency_us * _ema_factor + _min_latency_us * (1 - _ema_factor);
_min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
}
}
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
if (qps >= _ema_peak_qps) {
_ema_peak_qps = qps;
} else {
_ema_peak_qps =
qps * (_ema_factor / 10) + _ema_peak_qps * (1 - _ema_factor / 10);
_ema_peak_qps = qps * ema_factor + _ema_peak_qps * (1 - ema_factor);
}
}
int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish =
......@@ -223,18 +193,18 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 0.75;
next_max_concurrency = _max_concurrency * 3 / 4;
} else {
const double overload_threshold = FLAGS_auto_cl_overload_threshold;
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
if (avg_latency < (1.0 + _overload_threshold) * _min_latency_us) {
if (avg_latency < (1.0 + overload_threshold) * _min_latency_us) {
next_max_concurrency = std::ceil(noload_concurrency *
(2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
(2.0 + overload_threshold - double(avg_latency) / _min_latency_us));
} else {
next_max_concurrency = noload_concurrency;
}
......@@ -243,7 +213,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
return next_max_concurrency;
}
} // namespace policy
......
......@@ -27,13 +27,14 @@ namespace policy {
class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public:
AutoConcurrencyLimiter();
~AutoConcurrencyLimiter() {}
bool OnRequested() override;
bool OnRequested(int current_concurrency) override;
void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override;
AutoConcurrencyLimiter* New() const override;
void Destroy() override;
int MaxConcurrency() override;
AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
private:
struct SampleWindow {
......@@ -50,31 +51,31 @@ private:
int64_t total_succ_us;
};
int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called
// in AppSample()
int32_t UpdateMaxConcurrency(int64_t sampling_time_us);
void UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
double peak_qps();
SampleWindow _sw;
// modified per sample-window or more
int _max_concurrency;
int64_t _remeasure_start_us;
int64_t _reset_latency_us;
int64_t _min_latency_us;
int64_t _min_latency_us;
double _ema_peak_qps;
const double _ema_factor;
const double _overload_threshold;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
// modified per sample.
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> _total_succ_req;
butil::atomic<int32_t> _current_concurrency;
butil::Mutex _sw_mutex;
SampleWindow _sw;
// modified per request.
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _total_succ_req;
};
} // namespace policy
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment