Unverified Commit 329f3c62 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #451 from brpc/polish_cl_code

Polish cl code
parents 6ba83745 792abefd
......@@ -17,7 +17,7 @@
- **latency_cdf**: 用[CDF](https://en.wikipedia.org/wiki/Cumulative_distribution_function)展示分位值, 只能在html下查看。
- **max_latency**: 在html下*从右到左*分别是过去60秒,60分钟,24小时,30天的最大延时。纯文本下是10秒内([-bvar_dump_interval](http://brpc.baidu.com:8765/flags/bvar_dump_interval)控制)的最大延时。
- **qps**: 在html下从右到左分别是过去60秒,60分钟,24小时,30天的平均qps(Queries Per Second)。纯文本下是10秒内([-bvar_dump_interval](http://brpc.baidu.com:8765/flags/bvar_dump_interval)控制)的平均qps。
- **processing**: 正在处理的请求个数。在压力归0后若此指标仍持续不为0,server则很有可能bug,比如忘记调用done了或卡在某个处理步骤上了。
- **processing**: (新版改名为concurrency)正在处理的请求个数。在压力归0后若此指标仍持续不为0,server则很有可能bug,比如忘记调用done了或卡在某个处理步骤上了。
用户可通过让对应Service实现[brpc::Describable](https://github.com/brpc/brpc/blob/master/src/brpc/describable.h)自定义在/status页面上的描述.
......
......@@ -17,7 +17,7 @@ Meanings of the fields above:
- **latency_cdf**: shows percentiles as [CDF](https://en.wikipedia.org/wiki/Cumulative_distribution_function), only available on html.
- **max_latency**: max latency in recent *60s/60m/24h/30d* from *right to left* on html, max latency in recent 10s(by default, specified by [-bvar_dump_interval](http://brpc.baidu.com:8765/flags/bvar_dump_interval)) on plain texts.
- **qps**: QPS(Queries Per Second) in recent *60s/60m/24h/30d* from *right to left* on html. QPS in recent 10s(by default, specified by [-bvar_dump_interval](http://brpc.baidu.com:8765/flags/bvar_dump_interval)) on plain texts.
- **processing**: Number of requests being processed by the service. If this counter can't hit zero when the traffic to the service becomes zero, the server probably has bugs, such as forgetting to call done->Run() or stuck on some processing steps.
- **processing**: (renamed to concurrency in master) Number of requests being processed by the method. If this counter can't hit zero when the traffic to the service becomes zero, the server probably has bugs, such as forgetting to call done->Run() or stuck on some processing steps.
Users may customize descriptions on /status by letting the service implement [brpc::Describable](https://github.com/brpc/brpc/blob/master/src/brpc/describable.h).
......
......@@ -16,14 +16,31 @@
#include <cstring>
#include <strings.h>
#include "butil/string_printf.h"
#include "butil/logging.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/adaptive_max_concurrency.h"
namespace brpc {
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency()
: _value(UNLIMITED())
, _max_concurrency(0) {
}
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(int max_concurrency)
: _max_concurrency(0) {
if (max_concurrency <= 0) {
_value = UNLIMITED();
_max_concurrency = 0;
} else {
_value = butil::string_printf("%d", max_concurrency);
_max_concurrency = max_concurrency;
}
}
inline bool CompareStringPieceWithoutCase(
const butil::StringPiece& s1, const char* s2) {
const butil::StringPiece& s1, const char* s2) {
DCHECK(s2 != NULL);
if (std::strlen(s2) != s1.size()) {
return false;
......@@ -31,41 +48,61 @@ inline bool CompareStringPieceWithoutCase(
return ::strncasecmp(s1.data(), s2, s1.size()) == 0;
}
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(const butil::StringPiece& name) {
if (butil::StringToInt(name, &_max_concurrency) && _max_concurrency >= 0) {
_name = "constant";
} else if (_max_concurrency < 0) {
LOG(FATAL) << "Invalid max_concurrency: " << name;
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(const butil::StringPiece& value)
: _max_concurrency(0) {
int max_concurrency = 0;
if (butil::StringToInt(value, &max_concurrency)) {
operator=(max_concurrency);
} else {
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
value.CopyToString(&_value);
_max_concurrency = -1;
}
}
void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& name) {
void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& value) {
int max_concurrency = 0;
if (butil::StringToInt(name, &max_concurrency) && max_concurrency >= 0) {
_name = "constant";
_max_concurrency = max_concurrency;
} else if (max_concurrency < 0) {
LOG(ERROR) << "Fail to set max_concurrency, invalid value:" << name;
} else if (CompareStringPieceWithoutCase(name, "constant")) {
LOG(WARNING)
<< "If you want to use a constant maximum concurrency, assign "
<< "an integer value directly to ServerOptions.max_concurrency "
<< "like: `server_options.max_concurrency = 1000`";
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
if (butil::StringToInt(value, &max_concurrency)) {
return operator=(max_concurrency);
} else {
_name.assign(name.begin(), name.end());
value.CopyToString(&_value);
_max_concurrency = -1;
}
}
void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
if (max_concurrency <= 0) {
_value = UNLIMITED();
_max_concurrency = 0;
} else {
_value = butil::string_printf("%d", max_concurrency);
_max_concurrency = max_concurrency;
}
}
const std::string& AdaptiveMaxConcurrency::type() const {
if (_max_concurrency > 0) {
return CONSTANT();
} else if (_max_concurrency == 0) {
return UNLIMITED();
} else {
return _value;
}
}
const std::string& AdaptiveMaxConcurrency::UNLIMITED() {
static std::string* s = new std::string("unlimited");
return *s;
}
const std::string& AdaptiveMaxConcurrency::CONSTANT() {
static std::string* s = new std::string("constant");
return *s;
}
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency) {
return CompareStringPieceWithoutCase(concurrency,
adaptive_concurrency.name().c_str());
adaptive_concurrency.value().c_str());
}
} // namespace brpc
......@@ -27,37 +27,45 @@ namespace brpc {
class AdaptiveMaxConcurrency{
public:
AdaptiveMaxConcurrency()
: _name("constant")
, _max_concurrency(0) {}
AdaptiveMaxConcurrency(int max_concurrency)
: _name("constant")
, _max_concurrency(max_concurrency) {}
AdaptiveMaxConcurrency();
AdaptiveMaxConcurrency(int max_concurrency);
AdaptiveMaxConcurrency(const butil::StringPiece& value);
// Non-trivial destructor to prevent AdaptiveMaxConcurrency from being
// passed to variadic arguments without explicit type conversion.
// eg:
// printf("%d", options.max_concurrency) // compile error
// printf("%d", static_cast<int>(options.max_concurrency) // ok
// printf("%s", options.max_concurrency.value().c_str()) // ok
~AdaptiveMaxConcurrency() {}
AdaptiveMaxConcurrency(const butil::StringPiece& name);
void operator=(int max_concurrency) {
_name = "constant";
_max_concurrency = max_concurrency;
}
void operator=(const butil::StringPiece& name);
void operator=(int max_concurrency);
void operator=(const butil::StringPiece& value);
// 0 for type="unlimited"
// >0 for type="constant"
// <0 for type="user-defined"
operator int() const { return _max_concurrency; }
const std::string& name() const { return _name; }
// "unlimited" for type="unlimited"
// "10" "20" "30" for type="constant"
// "user-defined" for type="user-defined"
const std::string& value() const { return _value; }
// "unlimited", "constant" or "user-defined"
const std::string& type() const;
// Get strings filled with "unlimited" and "constant"
static const std::string& UNLIMITED();
static const std::string& CONSTANT();
private:
std::string _name;
std::string _value;
int _max_concurrency;
};
inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) {
return os << amc.value();
}
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency);
......
......@@ -62,17 +62,19 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
server->PrintTabsBody(os, "status");
os << "<div class=\"layer1\">\n";
}
os << "version: " << server->version() << '\n';
// non_service_error
if (use_html) {
os << "<p class=\"variable\">";
}
os << "non_service_error: ";
if (use_html) {
os << "<span id=\"value-" << server->_nerror.name() << "\">";
os << "<span id=\"value-" << server->_nerror_bvar.name() << "\">";
}
os << server->_nerror.get_value();
os << server->_nerror_bvar.get_value();
if (use_html) {
os << "</span></p><div class=\"detail\"><div id=\"" << server->_nerror.name()
os << "</span></p><div class=\"detail\"><div id=\"" << server->_nerror_bvar.name()
<< "\" class=\"flot-placeholder\"></div></div>";
}
os << '\n';
......@@ -95,12 +97,14 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
<< "_connection_count\" class=\"flot-placeholder\"></div></div>";
}
os << '\n';
const AdaptiveMaxConcurrency& max_concurrency =
server->options().max_concurrency;
if (max_concurrency == "constant") {
os << "max_concurrency: " << static_cast<int>(max_concurrency) << '\n';
// max_concurrency
os << "max_concurrency: ";
const int mc = server->options().max_concurrency;
if (mc <= 0) {
os << "unlimited";
} else {
os << "concurrency limiter: " << max_concurrency.name() << '\n';
os << mc;
}
os << '\n';
......@@ -158,9 +162,6 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
if (mp->status && mp->status->max_concurrency() > 0) {
os << " max_concurrency=" << mp->status->max_concurrency();
}
}
os << "</h4>\n";
} else {
......@@ -170,9 +171,6 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
if (mp->status && mp->status->max_concurrency() > 0) {
os << " max_concurrency=" << mp->status->max_concurrency();
}
}
os << '\n';
}
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include "brpc/concurrency_limiter.h"
namespace brpc {
ConcurrencyLimiter* ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency) {
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(max_concurrency.name().c_str());
CHECK(cl != NULL)
<< "Fail to find ConcurrencyLimiter by `"
<< max_concurrency.name() << "'";
ConcurrencyLimiter* cl_copy = cl->New();
CHECK(cl_copy != NULL) << "Fail to new ConcurrencyLimiter";
if (max_concurrency == "constant") {
cl_copy->_max_concurrency = max_concurrency;
}
return cl_copy;
}
} // namespace brpc
......@@ -24,15 +24,15 @@
namespace brpc {
class ConcurrencyLimiter : public Destroyable {
class ConcurrencyLimiter {
public:
ConcurrencyLimiter() : _max_concurrency(0) {}
virtual ~ConcurrencyLimiter() {}
// This method should be called each time a request comes in. It returns
// false when the concurrency reaches the upper limit, otherwise it
// returns true. Normally, when OnRequested returns false, you should
// return an ELIMIT error directly.
virtual bool OnRequested() = 0;
virtual bool OnRequested(int current_concurrency) = 0;
// Each request should call this method before responding.
// `error_code' : Error code obtained from the controller, 0 means success.
......@@ -41,29 +41,13 @@ public:
// still need to call OnResponded.
virtual void OnResponded(int error_code, int64_t latency_us) = 0;
// Returns the current maximum concurrency. Note that the maximum
// concurrency of some ConcurrencyLimiters(eg: `auto', `gradient')
// is dynamically changing.
int max_concurrency() { return _max_concurrency; };
// Expose internal vars. NOT thread-safe.
// Return 0 on success, -1 otherwise.
virtual int Expose(const butil::StringPiece& prefix) = 0;
// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual ConcurrencyLimiter* New() const = 0;
virtual ~ConcurrencyLimiter() {}
// Create ConcurrencyLimiter* and coredump if it fails.
// Caller is responsible for Destroy() the instance after usage.
static ConcurrencyLimiter* CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency);
// Returns the latest max_concurrency.
// The return value is only for logging.
virtual int MaxConcurrency() = 0;
protected:
// Assume int32_t is atomic in x86
int32_t _max_concurrency;
// Create an instance from the amc
// Caller is responsible for delete the instance after usage.
virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const = 0;
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
......
......@@ -115,8 +115,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
// Called Server._cl->OnRequested()
static const uint32_t FLAGS_CONCURRENCY_LIMITER_REQUESTED = (1 << 2);
static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2);
static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3);
static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4);
static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5);
......
......@@ -22,39 +22,46 @@
namespace brpc {
static int cast_nprocessing(void* arg) {
static int cast_int(void* arg) {
return *(int*)arg;
}
static int cast_cl(void* arg) {
auto cl = static_cast<std::unique_ptr<ConcurrencyLimiter>*>(arg)->get();
if (cl) {
return cl->MaxConcurrency();
}
return 0;
}
MethodStatus::MethodStatus()
: _cl(NULL)
, _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nrefused_per_second(&_nrefused_bvar, 1)
, _nprocessing(0) {
: _nconcurrency(0)
, _nconcurrency_bvar(cast_int, &_nconcurrency)
, _eps_bvar(&_nerror_bvar)
, _max_concurrency_bvar(cast_cl, &_cl)
{
}
MethodStatus::~MethodStatus() {
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
}
int MethodStatus::Expose(const butil::StringPiece& prefix) {
if (_nprocessing_bvar.expose_as(prefix, "processing") != 0) {
if (_nconcurrency_bvar.expose_as(prefix, "concurrency") != 0) {
return -1;
}
if (_nrefused_per_second.expose_as(prefix, "refused_per_second") != 0) {
if (_nerror_bvar.expose_as(prefix, "error") != 0) {
return -1;
}
if (_nerror.expose_as(prefix, "error") != 0) {
if (_eps_bvar.expose_as(prefix, "eps") != 0) {
return -1;
}
if (_latency_rec.expose(prefix) != 0) {
return -1;
}
if (NULL != _cl && _cl->Expose(prefix) != 0) {
return -1;
if (_cl) {
if (_max_concurrency_bvar.expose_as(prefix, "max_concurrency") != 0) {
return -1;
}
}
return 0;
}
......@@ -89,19 +96,27 @@ void OutputValue(std::ostream& os,
void MethodStatus::Describe(
std::ostream &os, const DescribeOptions& options) const {
// Sort by alphebetical order to be consistent with /vars.
const int64_t qps = _latency_rec.qps();
const bool expand = (qps != 0);
// success requests
OutputValue(os, "count: ", _latency_rec.count_name(), _latency_rec.count(),
options, false);
OutputValue(os, "error: ", _nerror.name(), _nerror.get_value(),
const int64_t qps = _latency_rec.qps();
const bool expand = (qps != 0);
OutputValue(os, "qps: ", _latency_rec.qps_name(), _latency_rec.qps(),
options, expand);
// errorous requests
OutputValue(os, "error: ", _nerror_bvar.name(), _nerror_bvar.get_value(),
options, false);
OutputValue(os, "eps: ", _eps_bvar.name(),
_eps_bvar.get_value(1), options, false);
// latencies
OutputValue(os, "latency: ", _latency_rec.latency_name(),
_latency_rec.latency(), options, false);
if (options.use_html) {
OutputValue(os, "latency_percentiles: ",
_latency_rec.latency_percentiles_name(),
_latency_rec.latency_percentiles(), options, expand);
_latency_rec.latency_percentiles(), options, false);
OutputValue(os, "latency_cdf: ", _latency_rec.latency_cdf_name(),
"click to view", options, expand);
} else {
......@@ -118,23 +133,21 @@ void MethodStatus::Describe(
}
OutputValue(os, "max_latency: ", _latency_rec.max_latency_name(),
_latency_rec.max_latency(), options, false);
OutputValue(os, "qps: ", _latency_rec.qps_name(), _latency_rec.qps(),
options, expand);
// Many people are confusing with the old name "unresponded" which
// contains "un" generally associated with something wrong. Name it
// to "processing" should be more understandable.
OutputValue(os, "processing: ", _nprocessing_bvar.name(),
_nprocessing, options, false);
// Concurrency
OutputValue(os, "concurrency: ", _nconcurrency_bvar.name(),
_nconcurrency, options, false);
if (_cl) {
OutputValue(os, "max_concurrency: ", _max_concurrency_bvar.name(),
MaxConcurrency(), options, false);
}
}
void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
if (NULL != _cl) {
_cl->Destroy();
}
_cl = cl;
_cl.reset(cl);
}
ScopedMethodStatus::~ScopedMethodStatus() {
ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
......
......@@ -34,9 +34,9 @@ public:
~MethodStatus();
// Call this function when the method is about to be called.
// Returns false when the request reaches max_concurrency to the method
// and is suggested to be rejected.
bool OnRequested();
// Returns false when the method is overloaded. If rejected_cc is not
// NULL, it's set with the rejected concurrency.
bool OnRequested(int* rejected_cc = NULL);
// Call this when the method just finished.
// `error_code' : The error code obtained from the controller. Equal to
......@@ -53,18 +53,10 @@ public:
// Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const;
// Current maximum concurrency of method.
// Return 0 if the maximum concurrency is not restricted.
int max_concurrency() const {
if (NULL == _cl) {
return 0;
} else {
return _cl->max_concurrency();
}
}
// Current max_concurrency of the method.
int MaxConcurrency() const { return _cl ? _cl->MaxConcurrency() : 0; }
private:
friend class ScopedMethodStatus;
friend class Server;
DISALLOW_COPY_AND_ASSIGN(MethodStatus);
......@@ -72,47 +64,46 @@ friend class Server;
// before the server is started.
void SetConcurrencyLimiter(ConcurrencyLimiter* cl);
ConcurrencyLimiter* _cl;
bvar::Adder<int64_t> _nerror;
std::unique_ptr<ConcurrencyLimiter> _cl;
butil::atomic<int> _nconcurrency;
bvar::Adder<int64_t> _nerror_bvar;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
bvar::Adder<uint32_t> _nrefused_bvar;
bvar::Window<bvar::Adder<uint32_t>> _nrefused_per_second;
butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
bvar::PassiveStatus<int> _nconcurrency_bvar;
bvar::PerSecond<bvar::Adder<int64_t>> _eps_bvar;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
};
class ScopedMethodStatus {
class ConcurrencyRemover {
public:
ScopedMethodStatus(MethodStatus* status,
Controller* c,
int64_t received_us)
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
: _status(status)
, _c(c)
, _received_us(received_us) {}
~ScopedMethodStatus();
operator MethodStatus* () const { return _status; }
~ConcurrencyRemover();
private:
DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus);
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
MethodStatus* _status;
Controller* _c;
uint64_t _received_us;
};
inline bool MethodStatus::OnRequested() {
_nprocessing.fetch_add(1, butil::memory_order_relaxed);
if (NULL == _cl || _cl->OnRequested()) {
inline bool MethodStatus::OnRequested(int* rejected_cc) {
const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
if (NULL == _cl || _cl->OnRequested(cc)) {
return true;
}
_nrefused_bvar << 1;
if (rejected_cc) {
*rejected_cc = cc;
}
return false;
}
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
_nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency;
} else {
_nerror << 1;
_nerror_bvar << 1;
}
if (NULL != _cl) {
_cl->OnResponded(error_code, latency);
......
......@@ -35,22 +35,22 @@ public:
}
void AddError() {
_server->_nerror << 1;
_server->_nerror_bvar << 1;
}
// Returns true if the `max_concurrency' limit is not reached.
bool AddConcurrency(Controller* c) {
if (NULL != _server->_cl) {
c->add_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED);
return _server->_cl->OnRequested();
}
return true;
if (_server->options().max_concurrency <= 0) {
return true;
}
c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
<= _server->options().max_concurrency);
}
void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED)){
CHECK(_server->_cl != NULL);
_server->_cl->OnResponded(c->ErrorCode(), c->latency_us());
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
}
}
......
......@@ -107,7 +107,10 @@ const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
, ch_md5_lb(MD5Hash32){}
, ch_md5_lb(MD5Hash32)
, constant_cl(0) {
}
#ifdef BAIDU_INTERNAL
BaiduNamingService bns;
#endif
......@@ -559,8 +562,8 @@ static void GlobalInitializeOrDieImpl() {
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after
......
......@@ -51,57 +51,32 @@ DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg;
}
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _remeasure_start_us(NextResetTime(butil::gettimeofday_us()))
: _max_concurrency(FLAGS_auto_cl_initial_max_concurrency)
, _remeasure_start_us(NextResetTime(butil::gettimeofday_us()))
, _reset_latency_us(0)
, _min_latency_us(-1)
, _ema_peak_qps(-1)
, _ema_factor(FLAGS_auto_cl_alpha_factor_for_ema)
, _overload_threshold(FLAGS_auto_cl_overload_threshold)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _total_succ_req(0)
, _current_concurrency(0) {
_max_concurrency = FLAGS_auto_cl_initial_max_concurrency;
}
int AutoConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
if (_max_concurrency_bvar.expose_as(prefix, "auto_cl_max_concurrency") != 0) {
return -1;
}
return 0;
, _total_succ_req(0) {
}
AutoConcurrencyLimiter* AutoConcurrencyLimiter::New() const {
AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency&) const {
return new (std::nothrow) AutoConcurrencyLimiter;
}
void AutoConcurrencyLimiter::Destroy() {
delete this;
}
bool AutoConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurrency >= _max_concurrency) {
return false;
}
return true;
bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
return current_concurrency <= _max_concurrency;
}
void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_total_succ_req.fetch_add(1, butil::memory_order_relaxed);
} else if (ELIMIT == error_code) {
return;
}
int64_t now_time_us = butil::gettimeofday_us();
const int64_t now_time_us = butil::gettimeofday_us();
int64_t last_sampling_time_us =
_last_sampling_time_us.load(butil::memory_order_relaxed);
......@@ -111,17 +86,15 @@ void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) {
LOG_EVERY_N(INFO, 60)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency
<< ", min_latency_us: " << _min_latency_us;
}
AddSample(error_code, latency_us, now_time_us);
}
}
}
int AutoConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency;
}
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
......@@ -129,17 +102,17 @@ int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
return reset_start_us;
}
int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
void AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
// Waiting for the current concurrent decline
if (_reset_latency_us > sampling_time_us) {
return 0;
}
// Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0) {
if (_reset_latency_us != 0) {
// min_latency is about to be reset soon.
if (_reset_latency_us > sampling_time_us) {
// ignoring samples during waiting for the deadline.
return;
}
// Remeasure min_latency when concurrency has dropped to low load
_min_latency_us = -1;
_reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us);
......@@ -165,24 +138,21 @@ int32_t AutoConcurrencyLimiter::AddSample(int error_code,
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return 0;
return;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return 0;
return;
}
if(_sw.succ_count > 0) {
int max_concurrency = UpdateMaxConcurrency(sampling_time_us);
ResetSampleWindow(sampling_time_us);
return max_concurrency;
UpdateMaxConcurrency(sampling_time_us);
} else {
// All request failed
_max_concurrency /= 2;
ResetSampleWindow(sampling_time_us);
return 0;
}
ResetSampleWindow(sampling_time_us);
}
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
......@@ -194,26 +164,26 @@ void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
}
void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
if (_min_latency_us <= 0) {
_min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) {
_min_latency_us = latency_us * _ema_factor + _min_latency_us * (1 - _ema_factor);
_min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
}
}
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
if (qps >= _ema_peak_qps) {
_ema_peak_qps = qps;
} else {
_ema_peak_qps =
qps * (_ema_factor / 10) + _ema_peak_qps * (1 - _ema_factor / 10);
_ema_peak_qps = qps * ema_factor + _ema_peak_qps * (1 - ema_factor);
}
}
int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish =
......@@ -223,18 +193,18 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 0.75;
next_max_concurrency = _max_concurrency * 3 / 4;
} else {
const double overload_threshold = FLAGS_auto_cl_overload_threshold;
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
if (avg_latency < (1.0 + _overload_threshold) * _min_latency_us) {
if (avg_latency < (1.0 + overload_threshold) * _min_latency_us) {
next_max_concurrency = std::ceil(noload_concurrency *
(2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
(2.0 + overload_threshold - double(avg_latency) / _min_latency_us));
} else {
next_max_concurrency = noload_concurrency;
}
......@@ -243,7 +213,6 @@ int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
return next_max_concurrency;
}
} // namespace policy
......
......@@ -27,13 +27,14 @@ namespace policy {
class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public:
AutoConcurrencyLimiter();
~AutoConcurrencyLimiter() {}
bool OnRequested() override;
bool OnRequested(int current_concurrency) override;
void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override;
AutoConcurrencyLimiter* New() const override;
void Destroy() override;
int MaxConcurrency() override;
AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
private:
struct SampleWindow {
......@@ -50,31 +51,31 @@ private:
int64_t total_succ_us;
};
int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called
// in AppSample()
int32_t UpdateMaxConcurrency(int64_t sampling_time_us);
void UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
double peak_qps();
SampleWindow _sw;
// modified per sample-window or more
int _max_concurrency;
int64_t _remeasure_start_us;
int64_t _reset_latency_us;
int64_t _min_latency_us;
int64_t _min_latency_us;
double _ema_peak_qps;
const double _ema_factor;
const double _overload_threshold;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
// modified per sample.
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> _total_succ_req;
butil::atomic<int32_t> _current_concurrency;
butil::Mutex _sw_mutex;
SampleWindow _sw;
// modified per request.
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _total_succ_req;
};
} // namespace policy
......
......@@ -138,7 +138,7 @@ void SendRpcResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -147,7 +147,7 @@ void SendRpcResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -392,7 +392,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
......@@ -435,10 +435,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = mp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -19,29 +19,25 @@
namespace brpc {
namespace policy {
bool ConstantConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (_max_concurrency != 0 && current_concurrency >= _max_concurrency) {
return false;
}
return true;
ConstantConcurrencyLimiter::ConstantConcurrencyLimiter(int max_concurrency)
: _max_concurrency(max_concurrency) {
}
void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency) {
return current_concurrency <= _max_concurrency;
}
int ConstantConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
return 0;
void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
}
ConstantConcurrencyLimiter* ConstantConcurrencyLimiter::New() const {
return new (std::nothrow) ConstantConcurrencyLimiter;
int ConstantConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency.load(butil::memory_order_relaxed);
}
void ConstantConcurrencyLimiter::Destroy() {
delete this;
ConstantConcurrencyLimiter*
ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const {
CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT());
return new ConstantConcurrencyLimiter(static_cast<int>(amc));
}
} // namespace policy
......
......@@ -24,19 +24,18 @@ namespace policy {
class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
ConstantConcurrencyLimiter() : _current_concurrency(0) {}
~ConstantConcurrencyLimiter() {}
bool OnRequested() override;
explicit ConstantConcurrencyLimiter(int max_concurrency);
bool OnRequested(int current_concurrency) override;
void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
void Destroy() override;
int MaxConcurrency() override;
ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
private:
butil::atomic<int32_t> _current_concurrency;
butil::atomic<int> _max_concurrency;
};
} // namespace policy
......
......@@ -550,7 +550,7 @@ static void SendHttpResponse(Controller *cntl,
const google::protobuf::Message *req,
const google::protobuf::Message *res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -558,7 +558,7 @@ static void SendHttpResponse(Controller *cntl,
span->set_start_send_us(butil::cpuwide_time_us());
}
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw,cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
Socket* socket = accessor.get_sending_socket();
......@@ -1166,10 +1166,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
non_service_error.release();
MethodStatus* method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
}
......@@ -1187,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -224,7 +224,7 @@ static void SendHuluResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -233,7 +233,7 @@ static void SendHuluResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -424,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -452,10 +452,10 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -60,7 +60,7 @@ SendMongoResponse::~SendMongoResponse() {
void SendMongoResponse::Run() {
std::unique_ptr<SendMongoResponse> delete_self(this);
ScopedMethodStatus method_status(status, &cntl, received_us);
ConcurrencyRemover concurrency_remover(status, &cntl, received_us);
Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket();
if (cntl.IsCloseConnection()) {
......@@ -222,7 +222,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -241,11 +241,11 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
MethodStatus* method_status = mp->status;
mongo_done->status = method_status;
if (method_status) {
if (!method_status->OnRequested()) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -71,8 +71,8 @@ void NsheadClosure::Run() {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(_server->options().nshead_service->_status,
&_controller, _received_us);
MethodStatus* method_status = _server->options().nshead_service->_status;
ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......@@ -294,7 +294,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -208,7 +208,7 @@ static void SendSofaResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -217,7 +217,7 @@ static void SendSofaResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -388,7 +388,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -408,10 +408,10 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -232,10 +232,9 @@ void ThriftClosure::DoRun() {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(
server->options().thrift_service ?
server->options().thrift_service->_status : NULL,
&_controller, _received_us);
MethodStatus* method_status = (server->options().thrift_service ?
server->options().thrift_service->_status : NULL);
ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......
......@@ -272,7 +272,7 @@ void* Server::UpdateDerivedVars(void* arg) {
std::vector<SocketId> conns;
std::vector<SocketId> internal_conns;
server->_nerror.expose_as(prefix, "error");
server->_nerror_bvar.expose_as(prefix, "error");
bvar::PassiveStatus<timeval> uptime_st(
prefix, "uptime", GetUptime, (void*)(intptr_t)start_us);
......@@ -382,7 +382,9 @@ Server::Server(ProfilerLinker)
, _last_start_time(0)
, _derivative_thread(INVALID_BTHREAD)
, _keytable_pool(NULL)
, _cl(NULL) {
, _concurrency(0) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
Server_concurrency_must_be_aligned_by_cacheline);
}
Server::~Server() {
......@@ -423,10 +425,6 @@ Server::~Server() {
delete _options.auth;
_options.auth = NULL;
}
if (_cl) {
_cl->Destroy();
_cl = NULL;
}
}
int Server::AddBuiltinServices() {
......@@ -664,6 +662,27 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(amc.type().c_str());
if (cl == NULL) {
LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
return false;
}
ConcurrencyLimiter* cl_copy = cl->New(amc);
if (cl_copy == NULL) {
LOG(ERROR) << "Fail to new ConcurrencyLimiter";
return false;
}
*out = cl_copy;
return true;
}
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method = 0;
int Server::StartInternal(const butil::ip_t& ip,
......@@ -835,6 +854,8 @@ int Server::StartInternal(const butil::ip_t& ip,
}
}
}
_concurrency = 0;
if (_options.has_builtin_services &&
_builtin_service_count <= 0 &&
......@@ -872,28 +893,21 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads);
}
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
if (_options.max_concurrency != "constant" ||
static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency);
_cl->Expose("Server_Concurrency_Limiter");
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else if (it->second.max_concurrency == "constant" &&
static_cast<int>(it->second.max_concurrency) == 0) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
it->second.status->SetConcurrencyLimiter(
ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
it->second.max_concurrency));
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
}
}
......@@ -1986,16 +2000,13 @@ bool Server::ClearCertMapping(CertMaps& bg) {
}
int Server::ResetMaxConcurrency(int max_concurrency) {
LOG(WARNING) << "ResetMaxConcurrency is already deprecated";
return 0;
}
int Server::max_concurrency() const {
if (NULL != _cl) {
return _cl->max_concurrency();
} else {
return g_default_max_concurrency_of_method;
if (!IsRunning()) {
LOG(WARNING) << "ResetMaxConcurrency is only allowd for a Running Server";
return -1;
}
// Assume that modifying int32 is atomical in X86
_options.max_concurrency = max_concurrency;
return 0;
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
......
......@@ -36,7 +36,6 @@
#include "brpc/builtin/tabbed.h"
#include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h"
#include "brpc/concurrency_limiter.h"
#include "brpc/adaptive_max_concurrency.h"
extern "C" {
......@@ -101,14 +100,14 @@ struct ServerOptions {
// Default: #cpu-cores
int num_threads;
// Limit number of requests processed in parallel. To limit the max
// concurrency of a method, use server.MaxConcurrencyOf("xxx") instead.
// Server-level max concurrency.
// "concurrency" = "number of requests processed in parallel"
//
// In a traditional server, number of pthread workers also limits
// concurrency. However brpc runs requests in bthreads which are
// mapped to pthread workers, when a bthread context switches, it gives
// the pthread worker to another bthread, yielding a higher concurrency
// than number of pthreads. In some situation, higher concurrency may
// than number of pthreads. In some situations, higher concurrency may
// consume more resources, to protect the server from running out of
// resources, you may set this option.
// If the server reaches the limitation, it responds client with ELIMIT
......@@ -116,12 +115,11 @@ struct ServerOptions {
// shall try another server.
// NOTE: accesses to builtin services are not limited by this option.
// Default: 0 (unlimited)
// NOTE: Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// even if you have set a maximum concurrency through `MaxConcurrencyOf`.
AdaptiveMaxConcurrency max_concurrency;
int max_concurrency;
// Default value of method-level max concurrencies,
// Overridable by Server.MaxConcurrencyOf().
AdaptiveMaxConcurrency method_max_concurrency;
// -------------------------------------------------------
// Differences between session-local and thread-local data
......@@ -484,13 +482,9 @@ public:
// current_tab_name is the tab highlighted.
void PrintTabsBody(std::ostream& os, const char* current_tab_name) const;
// This method is already deprecated.You should NOT call it anymore.
int ResetMaxConcurrency(int max_concurrency);
// Server's current max concurrency
int max_concurrency() const;
// Get/set max_concurrency associated with a method.
// Example:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
......@@ -659,11 +653,10 @@ friend class Controller;
bthread_keytable_pool_t* _keytable_pool;
// FIXME: Temporarily for `ServerPrivateAccessor' to change this bvar
// Replace `ServerPrivateAccessor' with other private-access
// mechanism
mutable bvar::Adder<int64_t> _nerror;
ConcurrencyLimiter* _cl;
// mutable is required for `ServerPrivateAccessor' to change this bvar
mutable bvar::Adder<int64_t> _nerror_bvar;
mutable int32_t BAIDU_CACHELINE_ALIGNMENT _concurrency;
};
// Get the data attached to current searching thread. The data is created by
......
......@@ -158,7 +158,7 @@ inline const IOBuf::BlockRef& IOBuf::_ref_at(size_t i) const {
inline const IOBuf::BlockRef* IOBuf::_pref_at(size_t i) const {
if (_small()) {
return i < (!!_sv.refs[0].block + !!_sv.refs[1].block) ? &_sv.refs[i] : NULL;
return i < (size_t)(!!_sv.refs[0].block + !!_sv.refs[1].block) ? &_sv.refs[i] : NULL;
} else {
return i < _bv.nref ? &_bv.ref_at(i) : NULL;
}
......
......@@ -255,7 +255,7 @@ TEST_F(HttpTest, process_request_failed_socket) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckResponseCode(true, 0);
}
......@@ -263,12 +263,12 @@ TEST_F(HttpTest, reject_get_to_pb_services_with_required_fields) {
brpc::policy::HttpContext* msg = MakeGetRequestMessage("/EchoService/Echo");
_server._status = brpc::Server::RUNNING;
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
const brpc::Server::MethodProperty* mp =
_server.FindMethodPropertyByFullName("test.EchoService.Echo");
ASSERT_TRUE(mp);
ASSERT_TRUE(mp->status);
ASSERT_EQ(1ll, mp->status->_nerror.get_value());
ASSERT_EQ(1ll, mp->status->_nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_BAD_REQUEST);
}
......@@ -276,14 +276,14 @@ TEST_F(HttpTest, process_request_logoff) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_SERVICE_UNAVAILABLE);
}
TEST_F(HttpTest, process_request_wrong_method) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/NO_SUCH_METHOD");
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_NOT_FOUND);
}
......
......@@ -213,7 +213,7 @@ TEST_F(HuluTest, process_request_failed_socket) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckResponseCode(true, 0);
}
......@@ -224,7 +224,7 @@ TEST_F(HuluTest, process_request_logoff) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::ELOGOFF);
}
......@@ -234,7 +234,7 @@ TEST_F(HuluTest, process_request_wrong_method) {
meta.set_method_index(10);
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::ENOMETHOD);
}
......
......@@ -147,7 +147,7 @@ TEST_F(MongoTest, process_request_logoff) {
ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessMongoRequest, req_pr.message(), false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
}
TEST_F(MongoTest, process_request_failed_socket) {
......@@ -162,7 +162,7 @@ TEST_F(MongoTest, process_request_failed_socket) {
ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessMongoRequest, req_pr.message(), false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
}
TEST_F(MongoTest, complete_flow) {
......
......@@ -155,7 +155,7 @@ TEST_F(NovaTest, process_request_failed_socket) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(head);
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckEmptyResponse();
}
......@@ -165,7 +165,7 @@ TEST_F(NovaTest, process_request_logoff) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(head);
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
ASSERT_TRUE(_socket->Failed());
CheckEmptyResponse();
}
......@@ -175,7 +175,7 @@ TEST_F(NovaTest, process_request_wrong_method) {
head.reserved = 10;
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(head);
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
ASSERT_TRUE(_socket->Failed());
CheckEmptyResponse();
}
......
......@@ -191,7 +191,7 @@ TEST_F(PublicPbrpcTest, process_request_failed_socket) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckResponseCode(true, 0);
}
......@@ -204,7 +204,7 @@ TEST_F(PublicPbrpcTest, process_request_logoff) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::ELOGOFF);
}
......@@ -216,7 +216,7 @@ TEST_F(PublicPbrpcTest, process_request_wrong_method) {
body->set_id(0);
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(&meta);
ProcessMessage(brpc::policy::ProcessNsheadRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
ASSERT_FALSE(_socket->Failed());
}
......
......@@ -206,7 +206,7 @@ TEST_F(SofaTest, process_request_failed_socket) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror.get_value());
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckResponseCode(true, 0);
}
......@@ -218,7 +218,7 @@ TEST_F(SofaTest, process_request_logoff) {
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::ELOGOFF);
}
......@@ -229,7 +229,7 @@ TEST_F(SofaTest, process_request_wrong_method) {
meta.set_method("EchoService.NO_SUCH_METHOD");
brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
ProcessMessage(brpc::policy::ProcessSofaRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror.get_value());
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::ENOMETHOD);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment