Commit a9679e24 authored by TousakaRin's avatar TousakaRin

Modify the algorithm: 1.use the actual measured qps minRtt every random time

parent 77d54240
......@@ -19,12 +19,6 @@
#include "brpc/errno.pb.h"
#include "brpc/policy/gradient_concurrency_limiter.h"
namespace bthread {
namespace brpc {
namespace policy {
......@@ -35,10 +29,10 @@ DEFINE_int32(gradient_cl_sample_window_size_ms, 1000,
"concurrency limiter");
DEFINE_int32(gradient_cl_min_sample_count, 100,
"Minimum sample count for update max concurrency");
DEFINE_int32(gradient_cl_adjust_smooth, 50,
"Smooth coefficient for adjust the max concurrency, the value is 0-99,"
DEFINE_double(gradient_cl_adjust_smooth, 0.9,
"Smooth coefficient for adjust the max concurrency, the value is 0-1,"
"the larger the value, the smaller the amount of each change");
DEFINE_int32(gradient_cl_initial_max_concurrency, 400,
DEFINE_int32(gradient_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter");
DEFINE_bool(gradient_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
......@@ -47,26 +41,25 @@ DEFINE_int32(gradient_cl_max_error_punish_ms, 3000,
DEFINE_double(gradient_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger the "
"configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(gradient_cl_window_count, 30,
"Sample windows count for compute history min average latency");
DEFINE_int32(gradient_cl_reserved_concurrency, 0,
"The maximum concurrency reserved when the service is not overloaded."
"When the traffic increases, the larger the configuration item, the "
"faster the maximum concurrency grows until the server is fully loaded."
"When the value is less than or equal to 0, square root of current "
"concurrency is used.");
DEFINE_double(gradient_cl_min_reduce_ratio, 0.5,
"The minimum reduce ratio of maximum concurrency per calculation."
" The value should be 0-1");
DEFINE_int32(gradient_cl_reset_count, 30,
"The service's latency will be re-measured every `reset_count' windows.");
static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg;
: _ws_queue(FLAGS_gradient_cl_window_count)
, _ws_index(0)
, _unused_max_concurrency(0)
: _unused_max_concurrency(0)
, _reset_count(NextResetCount())
, _min_latency_us(-1)
, _smooth(FLAGS_gradient_cl_adjust_smooth)
, _ema_qps(0)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _max_concurrency(FLAGS_gradient_cl_initial_max_concurrency)
......@@ -98,9 +91,9 @@ void GradientConcurrencyLimiter::Destroy() {
bool GradientConcurrencyLimiter::OnRequested() {
const int32_t current_concurreny =
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurreny >= _max_concurrency.load(butil::memory_order_relaxed)) {
if (current_concurrency >= _max_concurrency.load(butil::memory_order_relaxed)) {
return false;
return true;
......@@ -131,6 +124,11 @@ void GradientConcurrencyLimiter::OnResponded(int error_code,
int GradientConcurrencyLimiter::NextResetCount() {
int max_reset_count = FLAGS_gradient_cl_reset_count;
return rand() % (max_reset_count / 2) + max_reset_count / 2;
void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
int64_t sampling_time_us) {
......@@ -157,7 +155,7 @@ void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
FLAGS_gradient_cl_min_sample_count) {
LOG_EVERY_N(INFO, 100) << "Insufficient sample size";
} else if (_sw.succ_count > 0) {
} else {
LOG(ERROR) << "All request failed, resize max_concurrency";
......@@ -176,99 +174,65 @@ void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.total_succ_us = 0;
void GradientConcurrencyLimiter::UpdateConcurrency() {
void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) {
_min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) {
_min_latency_us = _min_latency_us * _smooth + latency_us * (1 - _smooth);
void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
int32_t qps = double(succ_count) / (sampling_time_us - _sw.start_time_us)
* 1000 * 1000;
_ema_qps = _ema_qps * _smooth + qps * (1 - _smooth);
void GradientConcurrencyLimiter::UpdateConcurrency(int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load();
int max_concurrency = _max_concurrency.load();
int32_t total_succ_req =, butil::memory_order_relaxed);
int64_t failed_punish = _sw.total_failed_us *
int32_t total_succ_req =, butil::memory_order_relaxed);
int64_t failed_punish =
_sw.total_failed_us * FLAGS_gradient_cl_fail_punish_ratio;
int64_t avg_latency =
(failed_punish + _sw.total_succ_us) / _sw.succ_count;
avg_latency = std::max(static_cast<int64_t>(1), avg_latency);
WindowSnap snap(avg_latency, current_concurrency, total_succ_req);
int64_t min_avg_latency_us = _ws_queue.bottom()->avg_latency_us;
int32_t safe_concurrency = _ws_queue.bottom()->actual_concurrency;
for (size_t i = 0; i < _ws_queue.size(); ++i) {
const WindowSnap& snap = *(_ws_queue.bottom(i));
if (min_avg_latency_us > snap.avg_latency_us) {
min_avg_latency_us = snap.avg_latency_us;
safe_concurrency = snap.actual_concurrency;
} else if (min_avg_latency_us == snap.avg_latency_us) {
safe_concurrency = std::max(safe_concurrency,
int smooth = FLAGS_gradient_cl_adjust_smooth;
if (smooth <= 0 || smooth > 99) {
<< "GFLAG `gradient_cl_adjust_smooth' should be 0-99,"
<< "current: " << FLAGS_gradient_cl_adjust_smooth
<< ", will compute with the defalut smooth value(50)";
smooth = 50;
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
UpdateQps(total_succ_req, sampling_time_us);
int reserved_concurrency = FLAGS_gradient_cl_reserved_concurrency;
if (reserved_concurrency <= 0) {
reserved_concurrency = std::ceil(std::sqrt(max_concurrency));
double fix_gradient = std::min(
1.0, double(min_avg_latency_us) / avg_latency);
int32_t next_concurrency = std::ceil(
max_concurrency * fix_gradient + reserved_concurrency);
next_concurrency = std::ceil(
(max_concurrency * smooth + next_concurrency * (100 - smooth)) / 100);
double min_reduce_ratio = FLAGS_gradient_cl_min_reduce_ratio;
if (min_reduce_ratio <= 0.0 || min_reduce_ratio >= 1.0) {
<< "GFLAG `gradient_cl_min_reduce_ratio' should "
<< "be 0-1, current:" << FLAGS_gradient_cl_min_reduce_ratio
<< " , will compute with the default value(0.5)";
min_reduce_ratio = 0.5;
next_concurrency = std::max(
next_concurrency, int32_t(max_concurrency * min_reduce_ratio));
next_concurrency = std::max(
next_concurrency, int32_t(safe_concurrency * min_reduce_ratio));
if (current_concurrency + reserved_concurrency < max_concurrency &&
max_concurrency < next_concurrency) {
<< "No need to expand the maximum concurrency"
<< ", min_avg_latency:" << min_avg_latency_us << "us"
<< ", sampling_avg_latency:" << avg_latency << "us"
<< ", current_concurrency:" << current_concurrency
<< ", current_max_concurrency:" << max_concurrency
<< ", next_max_concurrency:" << next_concurrency;
if (fix_gradient < 1.0 && max_concurrency < next_concurrency) {
for (size_t i = 0; i < _ws_queue.size(); ++i) {
const WindowSnap& snap = *(_ws_queue.bottom(i));
if (current_concurrency > snap.actual_concurrency &&
total_succ_req < snap.total_succ_req &&
avg_latency > snap.avg_latency_us) {
int32_t fixed_next_concurrency =
std::ceil(snap.actual_concurrency *
snap.avg_latency_us / avg_latency);
next_concurrency =
std::min(next_concurrency, fixed_next_concurrency);
int32_t next_concurrency =
std::ceil(_ema_qps * _min_latency_us / 1000.0 / 1000);
int32_t saved_min_latency_us = _min_latency_us;
if (--_reset_count == 0) {
_reset_count = NextResetCount();
_min_latency_us = -1;
if (current_concurrency >= max_concurrency - 2) {
next_concurrency -= std::sqrt(max_concurrency);
next_concurrency = std::max(next_concurrency, reserved_concurrency);
} else {
// current_concurrency < max_concurrency means the server is
// not overloaded and does not need to detect noload_latency by
// lowering the maximum concurrency
next_concurrency += reserved_concurrency;
} else {
next_concurrency += reserved_concurrency;
<< "Update max_concurrency by gradient limiter:"
<< " pre_max_concurrency:" << max_concurrency
<< ", min_avg_latency:" << min_avg_latency_us << "us"
<< ", min_avg_latency:" << saved_min_latency_us << "us"
<< ", reserved_concurrency:" << reserved_concurrency
<< ", sampling_avg_latency:" << avg_latency << "us"
<< ", failed_punish:" << failed_punish << "us"
<< ", fix_gradient=" << fix_gradient
<< ", ema_qps:" << _ema_qps
<< ", succ sample count" << _sw.succ_count
<< ", failed sample count" << _sw.failed_count
<< ", current_concurrency:" << current_concurrency
......@@ -52,26 +52,23 @@ private:
int64_t total_succ_us;
struct WindowSnap {
WindowSnap(int64_t latency_us, int32_t concurrency, int32_t succ_req)
: avg_latency_us(latency_us)
, actual_concurrency(concurrency)
, total_succ_req(succ_req) {}
int64_t avg_latency_us;
int32_t actual_concurrency;
int32_t total_succ_req;
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int NextResetCount();
//NOT thread-safe, should be called in AddSample()
void UpdateConcurrency();
// The following methods are not thread safe and can only be called
// in AppSample()
void UpdateConcurrency(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void AddMinLatency(int64_t latency_us);
SampleWindow _sw;
butil::BoundedQueue<WindowSnap> _ws_queue;
uint32_t _ws_index;
int32_t _unused_max_concurrency;
int _reset_count;
int64_t _min_latency_us;
const double _smooth;
int32_t _ema_qps;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment