Commit 24afccac authored by TousakaRin's avatar TousakaRin

Move _max_concurrency to the base class and store an AdaptiveMaxConcurrency on each MethodProperty

parent 25744b7f
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include "brpc/concurrency_limiter.h"
namespace brpc {
ConcurrencyLimiter* ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency) {
if (max_concurrency == "constant" && static_cast<int>(max_concurrency) == 0) {
return NULL;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(max_concurrency.name().c_str());
CHECK(cl != NULL)
<< "Fail to find ConcurrencyLimiter by `"
<< max_concurrency.name() << "'";
ConcurrencyLimiter* cl_copy = cl->New();
CHECK(cl_copy != NULL) << "Fail to new ConcurrencyLimiter";
if (max_concurrency == "constant") {
cl_copy->SetMaxConcurrency(max_concurrency);
}
return cl_copy;
}
} // namespace brpc
......@@ -20,12 +20,13 @@
#include "brpc/describable.h"
#include "brpc/destroyable.h"
#include "brpc/extension.h" // Extension<T>
#include "brpc/adaptive_max_concurrency.h" // AdaptiveMaxConcurrency
namespace brpc {
class ConcurrencyLimiter : public Destroyable {
public:
ConcurrencyLimiter() {}
ConcurrencyLimiter(): _max_concurrency(0) {}
// This method should be called each time a request comes in. It returns
// false when the concurrency reaches the upper limit, otherwise it
......@@ -43,14 +44,7 @@ public:
// Returns the current maximum concurrency. Note that the maximum
// concurrency of some ConcurrencyLimiters(eg: `auto', `gradient')
// is dynamically changing.
virtual int MaxConcurrency() const = 0;
// Returns the reference of maximum concurrency. mainly used to explicitly
// specify the maximum concurrency. This method can only be called before
// the server starts.
// NOTE: When using automatic concurrency limiter(eg: `auto', `gradient'),
// the specified maximum concurrency will NOT take effect.
virtual int& MaxConcurrencyRef() = 0;
int MaxConcurrency() { return _max_concurrency; };
// Expose internal vars. NOT thread-safe.
// Return 0 on success, -1 otherwise.
......@@ -61,6 +55,13 @@ public:
virtual ConcurrencyLimiter* New() const = 0;
virtual ~ConcurrencyLimiter() {}
static ConcurrencyLimiter* CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency);
protected:
// Assume int32_t is atomic in x86
int32_t _max_concurrency;
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
......
......@@ -32,14 +32,6 @@ void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
int ConstantConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency;
}
int& ConstantConcurrencyLimiter::MaxConcurrencyRef() {
return _max_concurrency;
}
int ConstantConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
return 0;
}
......
......@@ -24,23 +24,18 @@ namespace policy {
class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
ConstantConcurrencyLimiter()
: _max_concurrency(0),
_current_concurrency(0) {}
ConstantConcurrencyLimiter() {}
~ConstantConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int MaxConcurrency() const override;
int& MaxConcurrencyRef() override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
void Destroy() override;
private:
int32_t _max_concurrency;
butil::atomic<int32_t> _current_concurrency;
};
......
......@@ -36,8 +36,6 @@ DEFINE_int32(gradient_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter");
DEFINE_bool(gradient_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
DEFINE_int32(gradient_cl_max_error_punish_ms, 3000,
"The maximum time wasted for a single failed request");
DEFINE_double(gradient_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger the "
"configuration item, the more aggressive the penalty strategy.");
......@@ -55,24 +53,15 @@ static int32_t cast_max_concurrency(void* arg) {
}
GradientConcurrencyLimiter::GradientConcurrencyLimiter()
: _unused_max_concurrency(0)
, _reset_count(NextResetCount())
: _reset_count(NextResetCount())
, _min_latency_us(-1)
, _smooth(FLAGS_gradient_cl_adjust_smooth)
, _ema_qps(0)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _max_concurrency(FLAGS_gradient_cl_initial_max_concurrency)
, _total_succ_req(0)
, _current_concurrency(0) {
}
int GradientConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency.load(butil::memory_order_relaxed);
}
int& GradientConcurrencyLimiter::MaxConcurrencyRef() {
return _unused_max_concurrency;
_max_concurrency = FLAGS_gradient_cl_initial_max_concurrency;
}
int GradientConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
......@@ -93,14 +82,13 @@ void GradientConcurrencyLimiter::Destroy() {
bool GradientConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurrency >= _max_concurrency.load(butil::memory_order_relaxed)) {
if (current_concurrency >= _max_concurrency) {
return false;
}
return true;
}
void GradientConcurrencyLimiter::OnResponded(int error_code,
int64_t latency_us) {
void GradientConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_total_succ_req.fetch_add(1, butil::memory_order_relaxed);
......@@ -145,9 +133,6 @@ int32_t GradientConcurrencyLimiter::AddSample(int error_code,
if (error_code != 0 &&
FLAGS_gradient_cl_enable_error_punish) {
++_sw.failed_count;
latency_us =
std::min(int64_t(FLAGS_gradient_cl_max_error_punish_ms) * 1000,
latency_us);
_sw.total_failed_us += latency_us;
} else if (error_code == 0) {
++_sw.succ_count;
......@@ -187,22 +172,18 @@ void GradientConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) {
_min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) {
_min_latency_us =
_min_latency_us * _smooth + latency_us * (1 - _smooth);
_min_latency_us = _min_latency_us * _smooth + latency_us * (1 - _smooth);
}
}
void GradientConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
double qps = double(succ_count) / (sampling_time_us - _sw.start_time_us)
* 1000 * 1000;
double qps = succ_count / (sampling_time_us - _sw.start_time_us) * 1000000.0;
_ema_qps = _ema_qps * _smooth + qps * (1 - _smooth);
}
int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(
int64_t sampling_time_us) {
int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t current_concurrency = _current_concurrency.load();
int max_concurrency = _max_concurrency.load();
int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed);
int64_t failed_punish =
......@@ -214,20 +195,20 @@ int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(
int reserved_concurrency = FLAGS_gradient_cl_reserved_concurrency;
if (reserved_concurrency <= 0) {
reserved_concurrency = std::ceil(std::sqrt(max_concurrency));
reserved_concurrency = std::ceil(std::sqrt(_max_concurrency));
}
int32_t next_max_concurrency =
std::ceil(_ema_qps * _min_latency_us / 1000000.0);
if (--_reset_count == 0) {
_reset_count = NextResetCount();
if (current_concurrency >= max_concurrency - 2) {
if (current_concurrency >= _max_concurrency - 2) {
_min_latency_us = -1;
next_max_concurrency -= std::sqrt(max_concurrency);
next_max_concurrency -= std::sqrt(_max_concurrency);
next_max_concurrency =
std::max(next_max_concurrency, reserved_concurrency);
} else {
// current_concurrency < max_concurrency means the server is
// current_concurrency < _max_concurrency means the server is
// not overloaded and does not need to detect noload_latency by
// lowering the maximum concurrency
next_max_concurrency += reserved_concurrency;
......@@ -236,12 +217,11 @@ int32_t GradientConcurrencyLimiter::UpdateMaxConcurrency(
next_max_concurrency += reserved_concurrency;
}
if (next_max_concurrency != max_concurrency) {
_max_concurrency.store(next_max_concurrency, butil::memory_order_relaxed);
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
return next_max_concurrency;
}
} // namespace policy
} // namespace brpc
......@@ -30,8 +30,6 @@ public:
~GradientConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int MaxConcurrency() const override;
int& MaxConcurrencyRef() override;
int Expose(const butil::StringPiece& prefix) override;
GradientConcurrencyLimiter* New() const override;
......@@ -58,10 +56,9 @@ private:
// The following methods are not thread safe and can only be called
// in AppSample()
int32_t UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void AddMinLatency(int64_t latency_us);
SampleWindow _sw;
int32_t _unused_max_concurrency;
......@@ -72,7 +69,6 @@ private:
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> _max_concurrency;
butil::atomic<int32_t> _total_succ_req;
butil::atomic<int32_t> _current_concurrency;
};
......
......@@ -664,7 +664,7 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}
static int g_default_max_concurrency_of_method = 0;
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method = 0;
int Server::StartInternal(const butil::ip_t& ip,
const PortRange& port_range,
......@@ -872,42 +872,25 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads);
}
if (_options.max_concurrency == "constant") {
if (static_cast<int>(_options.max_concurrency) != 0) {
const ConcurrencyLimiter* constant_cl =
ConcurrencyLimiterExtension()->Find("constant");
if (NULL == constant_cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `constant'";
}
ConcurrencyLimiter* cl_copy = constant_cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
_cl->MaxConcurrencyRef() = _options.max_concurrency;
}
} else {
const ConcurrencyLimiter* cl = NULL;
cl = ConcurrencyLimiterExtension()->Find(
_options.max_concurrency.name().c_str());
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
if (_options.max_concurrency != "constant" ||
static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency);
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
continue;
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
continue;
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
it->second.status->SetConcurrencyLimiter(cl_copy);
if (it->second.max_concurrency == "constant" &&
static_cast<int>(_options.max_concurrency) == 0) {
continue;
}
it->second.status->SetConcurrencyLimiter(
ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
it->second.max_concurrency));
}
// Create listening ports
if (port_range.min_port > port_range.max_port) {
LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
......@@ -2001,18 +1984,19 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
return 0;
}
int& Server::MaxConcurrencyOf(MethodProperty* mp) {
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
//TODO
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return mp->status->max_concurrency_ref();
return mp->max_concurrency;
}
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
......@@ -2023,11 +2007,10 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (mp == NULL || mp->status == NULL) {
return 0;
}
const MethodStatus* mp_status = mp->status;
return mp_status->max_concurrency();
return mp->max_concurrency;
}
int& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
......@@ -2041,7 +2024,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
return MaxConcurrencyOf(_method_map.seek(full_method_name));
}
int& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
......@@ -2060,7 +2043,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
full_service_name, method_name));
}
int& Server::MaxConcurrencyOf(google::protobuf::Service* service,
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}
......
......@@ -334,6 +334,7 @@ public:
google::protobuf::Service* service;
const google::protobuf::MethodDescriptor* method;
MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
MethodProperty();
};
......@@ -497,15 +498,15 @@ public:
// an auto concurrency limiter, eg `options.max_concurrency = "auto"`.If you
// still called non-const version of the interface, your changes to the
// maximum concurrency will not take effect.
int& MaxConcurrencyOf(const butil::StringPiece& full_method_name);
AdaptiveMaxConcurrency& MaxConcurrencyOf(const butil::StringPiece& full_method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_method_name) const;
int& MaxConcurrencyOf(const butil::StringPiece& full_service_name,
AdaptiveMaxConcurrency& MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const;
int& MaxConcurrencyOf(google::protobuf::Service* service,
AdaptiveMaxConcurrency& MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name);
int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;
......@@ -599,7 +600,7 @@ friend class Controller;
static bool ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map);
static bool ClearCertMapping(CertMaps& bg);
int& MaxConcurrencyOf(MethodProperty*);
AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
DISALLOW_COPY_AND_ASSIGN(Server);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment