Commit ea5151d3 authored by TousakaRin's avatar TousakaRin

rename class: GradientConcurrencyLimiter to AutoConcurrencyLimiter

parent f7daa9b7
...@@ -67,7 +67,7 @@ ...@@ -67,7 +67,7 @@
// Concurrency Limiters // Concurrency Limiters
#include "brpc/concurrency_limiter.h" #include "brpc/concurrency_limiter.h"
#include "brpc/policy/gradient_concurrency_limiter.h" #include "brpc/policy/auto_concurrency_limiter.h"
#include "brpc/policy/constant_concurrency_limiter.h" #include "brpc/policy/constant_concurrency_limiter.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger #include "brpc/input_messenger.h" // get_or_new_client_side_messenger
...@@ -125,7 +125,7 @@ struct GlobalExtensions { ...@@ -125,7 +125,7 @@ struct GlobalExtensions {
ConsistentHashingLoadBalancer ch_md5_lb; ConsistentHashingLoadBalancer ch_md5_lb;
DynPartLoadBalancer dynpart_lb; DynPartLoadBalancer dynpart_lb;
GradientConcurrencyLimiter gradient_cl; AutoConcurrencyLimiter auto_cl;
ConstantConcurrencyLimiter constant_cl; ConstantConcurrencyLimiter constant_cl;
}; };
...@@ -558,8 +558,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -558,8 +558,7 @@ static void GlobalInitializeOrDieImpl() {
} }
// Concurrency Limiters // Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->gradient_cl); ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("gradient", &g_ext->gradient_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl); ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
if (FLAGS_usercode_in_pthread) { if (FLAGS_usercode_in_pthread) {
......
...@@ -17,71 +17,71 @@ ...@@ -17,71 +17,71 @@
#include <cmath> #include <cmath>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "brpc/errno.pb.h" #include "brpc/errno.pb.h"
#include "brpc/policy/gradient_concurrency_limiter.h" #include "brpc/policy/auto_concurrency_limiter.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
DEFINE_int32(gradient_cl_peak_qps_window_size, 30, ""); DEFINE_int32(auto_cl_peak_qps_window_size, 30, "");
DEFINE_int32(gradient_cl_sampling_interval_us, 100, DEFINE_int32(auto_cl_sampling_interval_us, 100,
"Interval for sampling request in gradient concurrency limiter"); "Interval for sampling request in auto concurrency limiter");
DEFINE_int32(gradient_cl_sample_window_size_ms, 1000, DEFINE_int32(auto_cl_sample_window_size_ms, 1000,
"Sample window size for update max concurrency in grandient " "Sample window size for update max concurrency in grandient "
"concurrency limiter"); "concurrency limiter");
DEFINE_int32(gradient_cl_min_sample_count, 100, DEFINE_int32(auto_cl_min_sample_count, 100,
"Minimum sample count for update max concurrency"); "Minimum sample count for update max concurrency");
DEFINE_double(gradient_cl_adjust_smooth, 0.9, DEFINE_double(auto_cl_adjust_smooth, 0.9,
"Smooth coefficient for adjust the max concurrency, the value is 0-1," "Smooth coefficient for adjust the max concurrency, the value is 0-1,"
"the larger the value, the smaller the amount of each change"); "the larger the value, the smaller the amount of each change");
DEFINE_int32(gradient_cl_initial_max_concurrency, 40, DEFINE_int32(auto_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter"); "Initial max concurrency for grandient concurrency limiter");
DEFINE_bool(gradient_cl_enable_error_punish, true, DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency"); "Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(gradient_cl_fail_punish_ratio, 1.0, DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger the " "Use the failed requests to punish normal requests. The larger the "
"configuration item, the more aggressive the penalty strategy."); "configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(gradient_cl_reserved_concurrency, 0, DEFINE_int32(auto_cl_reserved_concurrency, 0,
"The maximum concurrency reserved when the service is not overloaded." "The maximum concurrency reserved when the service is not overloaded."
"When the traffic increases, the larger the configuration item, the " "When the traffic increases, the larger the configuration item, the "
"faster the maximum concurrency grows until the server is fully loaded." "faster the maximum concurrency grows until the server is fully loaded."
"When the value is less than or equal to 0, square root of current " "When the value is less than or equal to 0, square root of current "
"concurrency is used."); "concurrency is used.");
DEFINE_int32(gradient_cl_reset_count, 30, DEFINE_int32(auto_cl_reset_count, 30,
"The service's latency will be re-measured every `reset_count' windows."); "The service's latency will be re-measured every `reset_count' windows.");
static int32_t cast_max_concurrency(void* arg) { static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg; return *(int32_t*) arg;
} }
GradientConcurrencyLimiter::GradientConcurrencyLimiter() AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _reset_count(NextResetCount()) : _reset_count(NextResetCount())
, _min_latency_us(-1) , _min_latency_us(-1)
, _smooth(FLAGS_gradient_cl_adjust_smooth) , _smooth(FLAGS_auto_cl_adjust_smooth)
, _ema_peak_qps(-1) , _ema_peak_qps(-1)
, _qps_bq(FLAGS_gradient_cl_peak_qps_window_size) , _qps_bq(FLAGS_auto_cl_peak_qps_window_size)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency) , _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
, _total_succ_req(0) , _total_succ_req(0)
, _current_concurrency(0) { , _current_concurrency(0) {
_max_concurrency = FLAGS_gradient_cl_initial_max_concurrency; _max_concurrency = FLAGS_auto_cl_initial_max_concurrency;
} }
int GradientConcurrencyLimiter::Expose(const butil::StringPiece& prefix) { int AutoConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
if (_max_concurrency_bvar.expose_as(prefix, "gradient_cl_max_concurrency") != 0) { if (_max_concurrency_bvar.expose_as(prefix, "auto_cl_max_concurrency") != 0) {
return -1; return -1;
} }
return 0; return 0;
} }
GradientConcurrencyLimiter* GradientConcurrencyLimiter::New() const { AutoConcurrencyLimiter* AutoConcurrencyLimiter::New() const {
return new (std::nothrow) GradientConcurrencyLimiter; return new (std::nothrow) AutoConcurrencyLimiter;
} }
void GradientConcurrencyLimiter::Destroy() { void AutoConcurrencyLimiter::Destroy() {
delete this; delete this;
} }
bool GradientConcurrencyLimiter::OnRequested() { bool AutoConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency = const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed); _current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurrency >= _max_concurrency) { if (current_concurrency >= _max_concurrency) {
...@@ -90,7 +90,7 @@ bool GradientConcurrencyLimiter::OnRequested() { ...@@ -90,7 +90,7 @@ bool GradientConcurrencyLimiter::OnRequested() {
return true; return true;
} }
void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) { void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed); _current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) { if (0 == error_code) {
_total_succ_req.fetch_add(1, butil::memory_order_relaxed); _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
...@@ -104,7 +104,7 @@ void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) ...@@ -104,7 +104,7 @@ void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us)
if (last_sampling_time_us == 0 || if (last_sampling_time_us == 0 ||
now_time_us - last_sampling_time_us >= now_time_us - last_sampling_time_us >=
FLAGS_gradient_cl_sampling_interval_us) { FLAGS_auto_cl_sampling_interval_us) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_weak( bool sample_this_call = _last_sampling_time_us.compare_exchange_weak(
last_sampling_time_us, now_time_us, last_sampling_time_us, now_time_us,
butil::memory_order_relaxed); butil::memory_order_relaxed);
...@@ -112,19 +112,19 @@ void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) ...@@ -112,19 +112,19 @@ void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us)
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us); int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) { if (max_concurrency != 0) {
LOG(INFO) LOG(INFO)
<< "MaxConcurrency updated by gradient limiter:" << "MaxConcurrency updated by auto limiter:"
<< "current_max_concurrency:" << max_concurrency; << "current_max_concurrency:" << max_concurrency;
} }
} }
} }
} }
int GradientConcurrencyLimiter::NextResetCount() { int AutoConcurrencyLimiter::NextResetCount() {
int max_reset_count = FLAGS_gradient_cl_reset_count; int max_reset_count = FLAGS_auto_cl_reset_count;
return butil::fast_rand_less_than(max_reset_count / 2) + max_reset_count / 2; return butil::fast_rand_less_than(max_reset_count / 2) + max_reset_count / 2;
} }
int32_t GradientConcurrencyLimiter::AddSample(int error_code, int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us, int64_t latency_us,
int64_t sampling_time_us) { int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex); std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
...@@ -133,7 +133,7 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code, ...@@ -133,7 +133,7 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code,
} }
if (error_code != 0 && if (error_code != 0 &&
FLAGS_gradient_cl_enable_error_punish) { FLAGS_auto_cl_enable_error_punish) {
++_sw.failed_count; ++_sw.failed_count;
_sw.total_failed_us += latency_us; _sw.total_failed_us += latency_us;
} else if (error_code == 0) { } else if (error_code == 0) {
...@@ -142,9 +142,9 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code, ...@@ -142,9 +142,9 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code,
} }
if (sampling_time_us - _sw.start_time_us < if (sampling_time_us - _sw.start_time_us <
FLAGS_gradient_cl_sample_window_size_ms * 1000 || FLAGS_auto_cl_sample_window_size_ms * 1000 ||
_sw.succ_count + _sw.failed_count < _sw.succ_count + _sw.failed_count <
FLAGS_gradient_cl_min_sample_count) { FLAGS_auto_cl_min_sample_count) {
return 0; return 0;
} }
...@@ -162,7 +162,7 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code, ...@@ -162,7 +162,7 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code,
} }
} }
void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.start_time_us = sampling_time_us; _sw.start_time_us = sampling_time_us;
_sw.succ_count = 0; _sw.succ_count = 0;
_sw.failed_count = 0; _sw.failed_count = 0;
...@@ -170,7 +170,7 @@ void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { ...@@ -170,7 +170,7 @@ void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.total_succ_us = 0; _sw.total_succ_us = 0;
} }
void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) { if (_min_latency_us <= 0) {
_min_latency_us = latency_us; _min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) { } else if (latency_us < _min_latency_us) {
...@@ -178,7 +178,7 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { ...@@ -178,7 +178,7 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
} }
} }
void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count, void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) { int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us); double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
_qps_bq.elim_push(qps); _qps_bq.elim_push(qps);
...@@ -190,18 +190,18 @@ void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count, ...@@ -190,18 +190,18 @@ void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count,
_ema_peak_qps = _ema_peak_qps * _smooth + peak_qps * (1 - _smooth); _ema_peak_qps = _ema_peak_qps * _smooth + peak_qps * (1 - _smooth);
} }
int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load(); int32_t current_concurrency = _current_concurrency.load();
int32_t total_succ_req = int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed); _total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish = double failed_punish =
_sw.total_failed_us * FLAGS_gradient_cl_fail_punish_ratio; _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency = int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count); std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
UpdateMinLatency(avg_latency); UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us); UpdateQps(total_succ_req, sampling_time_us);
int reserved_concurrency = FLAGS_gradient_cl_reserved_concurrency; int reserved_concurrency = FLAGS_auto_cl_reserved_concurrency;
if (reserved_concurrency <= 0) { if (reserved_concurrency <= 0) {
reserved_concurrency = std::ceil(std::sqrt(_max_concurrency)); reserved_concurrency = std::ceil(std::sqrt(_max_concurrency));
} }
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
// //
// Authors: Lei He (helei@qiyi.com) // Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H #ifndef BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H #define BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
#include "bvar/bvar.h" #include "bvar/bvar.h"
#include "butil/containers/bounded_queue.h" #include "butil/containers/bounded_queue.h"
...@@ -24,15 +24,15 @@ ...@@ -24,15 +24,15 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
class GradientConcurrencyLimiter : public ConcurrencyLimiter { class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public: public:
GradientConcurrencyLimiter(); AutoConcurrencyLimiter();
~GradientConcurrencyLimiter() {} ~AutoConcurrencyLimiter() {}
bool OnRequested() override; bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override; void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override; int Expose(const butil::StringPiece& prefix) override;
GradientConcurrencyLimiter* New() const override; AutoConcurrencyLimiter* New() const override;
void Destroy() override; void Destroy() override;
private: private:
...@@ -78,4 +78,4 @@ private: ...@@ -78,4 +78,4 @@ private:
} // namespace brpc } // namespace brpc
#endif // BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H #endif // BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment