Commit 7b65d3ab authored by TousakaRin's avatar TousakaRin

Fix bug: 1.constant_cl for method doesn't work 2.qps = 0 in gradient_cl…

Fix bug: 1.constant_cl for method doesn't work 2.qps = 0 in gradient_cl 3.SeverOptions.max_concurrency can't return correct value when using auto_cl
parent be024bfb
...@@ -392,7 +392,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { ...@@ -392,7 +392,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed( cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d", ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
DEFINE_int32(gradient_cl_peak_qps_window_size, 30, "");
DEFINE_int32(gradient_cl_sampling_interval_us, 100, DEFINE_int32(gradient_cl_sampling_interval_us, 100,
"Interval for sampling request in gradient concurrency limiter"); "Interval for sampling request in gradient concurrency limiter");
DEFINE_int32(gradient_cl_sample_window_size_ms, 1000, DEFINE_int32(gradient_cl_sample_window_size_ms, 1000,
...@@ -56,7 +57,8 @@ GradientConcurrencyLimiter::GradientConcurrencyLimiter() ...@@ -56,7 +57,8 @@ GradientConcurrencyLimiter::GradientConcurrencyLimiter()
: _reset_count(NextResetCount()) : _reset_count(NextResetCount())
, _min_latency_us(-1) , _min_latency_us(-1)
, _smooth(FLAGS_gradient_cl_adjust_smooth) , _smooth(FLAGS_gradient_cl_adjust_smooth)
, _ema_qps(0) , _ema_peak_qps(-1)
, _qps_bq(FLAGS_gradient_cl_peak_qps_window_size)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency) , _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0) , _last_sampling_time_us(0)
, _total_succ_req(0) , _total_succ_req(0)
...@@ -178,15 +180,21 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) { ...@@ -178,15 +180,21 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count, void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) { int64_t sampling_time_us) {
double qps = succ_count / (sampling_time_us - _sw.start_time_us) * 1000000.0; double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
_ema_qps = _ema_qps * _smooth + qps * (1 - _smooth); _qps_bq.elim_push(qps);
double peak_qps = *(_qps_bq.bottom());
for (size_t i = 0; i < _qps_bq.size(); ++i) {
peak_qps = std::max(*(_qps_bq.bottom(i)), peak_qps);
}
_ema_peak_qps = _ema_peak_qps * _smooth + peak_qps * (1 - _smooth);
} }
int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) { int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load(); int32_t current_concurrency = _current_concurrency.load();
int32_t total_succ_req = int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed); _total_succ_req.exchange(0, butil::memory_order_relaxed);
int64_t failed_punish = double failed_punish =
_sw.total_failed_us * FLAGS_gradient_cl_fail_punish_ratio; _sw.total_failed_us * FLAGS_gradient_cl_fail_punish_ratio;
int64_t avg_latency = int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count); std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
...@@ -199,7 +207,7 @@ int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_u ...@@ -199,7 +207,7 @@ int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_u
} }
int32_t next_max_concurrency = int32_t next_max_concurrency =
std::ceil(_ema_qps * _min_latency_us / 1000000.0); std::ceil(_ema_peak_qps * _min_latency_us / 1000000.0);
if (--_reset_count == 0) { if (--_reset_count == 0) {
_reset_count = NextResetCount(); _reset_count = NextResetCount();
if (current_concurrency >= _max_concurrency - 2) { if (current_concurrency >= _max_concurrency - 2) {
......
...@@ -59,13 +59,14 @@ private: ...@@ -59,13 +59,14 @@ private:
void ResetSampleWindow(int64_t sampling_time_us); void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us); void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us); void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
double peak_qps();
SampleWindow _sw; SampleWindow _sw;
int32_t _unused_max_concurrency;
int _reset_count; int _reset_count;
int64_t _min_latency_us; int64_t _min_latency_us;
const double _smooth; const double _smooth;
double _ema_qps; double _ema_peak_qps;
butil::BoundedQueue<double> _qps_bq;
butil::Mutex _sw_mutex; butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar; bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us; butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
......
...@@ -1187,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1187,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} }
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us()); return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -424,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { ...@@ -424,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -222,7 +222,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { ...@@ -222,7 +222,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) { if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed( mongo_done->cntl.SetFailed(
ELIMIT, "Reached server's max_concurrency=%d", ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -294,7 +294,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { ...@@ -294,7 +294,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed( cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d", ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -388,7 +388,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { ...@@ -388,7 +388,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed( cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d", ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -523,7 +523,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -523,7 +523,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
} }
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency)); server->MaxConcurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -876,6 +876,7 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -876,6 +876,7 @@ int Server::StartInternal(const butil::ip_t& ip,
static_cast<int>(_options.max_concurrency) != 0) { static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie( _cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency); _options.max_concurrency);
_cl->Expose("Global_Concurrency_Limiter");
} }
for (MethodMap::iterator it = _method_map.begin(); for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) { it != _method_map.end(); ++it) {
...@@ -883,7 +884,7 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -883,7 +884,7 @@ int Server::StartInternal(const butil::ip_t& ip,
continue; continue;
} }
if (it->second.max_concurrency == "constant" && if (it->second.max_concurrency == "constant" &&
static_cast<int>(_options.max_concurrency) == 0) { static_cast<int>(it->second.max_concurrency) == 0) {
continue; continue;
} }
it->second.status->SetConcurrencyLimiter( it->second.status->SetConcurrencyLimiter(
...@@ -1984,6 +1985,14 @@ int Server::ResetMaxConcurrency(int max_concurrency) { ...@@ -1984,6 +1985,14 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
return 0; return 0;
} }
int Server::MaxConcurrency() const {
if (NULL != _cl) {
return _cl->MaxConcurrency();
} else {
return g_default_max_concurrency_of_method;
}
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) { AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) { if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started"; LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
......
...@@ -488,6 +488,9 @@ public: ...@@ -488,6 +488,9 @@ public:
// This method is already deprecated.You should NOT call it anymore. // This method is already deprecated.You should NOT call it anymore.
int ResetMaxConcurrency(int max_concurrency); int ResetMaxConcurrency(int max_concurrency);
// Server's current max concurrency
int MaxConcurrency() const;
// Get/set max_concurrency associated with a method. // Get/set max_concurrency associated with a method.
// Example: // Example:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10; // server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment