Commit b7509aea authored by TousakaRin's avatar TousakaRin

1. Set the default value of cl to NULL 2. Modify the interface such as…

1. Set the default value of cl to NULL  2. Modify the interface such as MaxConcurrencyOf, and the call will be given an error after the service is started.
parent ab94e75a
......@@ -158,9 +158,10 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
const MethodStatus* mp_status = mp->status;
if (NULL != mp_status && mp_status->max_concurrency() > 0) {
os << " max_concurrency=" << mp_status->max_concurrency();
if (NULL != mp->status &&
mp->status->max_concurrency() > 0) {
os << " current_max_concurrency="
<< mp->status->max_concurrency();
}
}
os << "</h4>\n";
......@@ -171,9 +172,10 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
const MethodStatus* mp_status = mp->status;
if (NULL != mp_status && mp_status->max_concurrency() > 0) {
os << " max_concurrency=" << mp_status->max_concurrency();
if (NULL != mp->status &&
mp->status->max_concurrency() > 0) {
os << " max_concurrency="
<< mp->status->max_concurrency();
}
}
os << '\n';
......
......@@ -27,21 +27,40 @@ class ConcurrencyLimiter : public NonConstDescribable, public Destroyable {
public:
ConcurrencyLimiter() {}
// This method should be called each time a request comes in. It returns
// false when the concurrency reaches the upper limit, otherwise it
// returns true. Normally, when OnRequested returns false, you should
// return an ELIMIT error directly.
virtual bool OnRequested() = 0;
virtual void OnResponded(int error_code, int64_t latency) = 0;
// Each request should call this method before responding.
// `error_code' : Error code obtained from the controller, 0 means success.
// `latency' : Microseconds taken by RPC.
// NOTE: Even if OnRequested returns false, after sending ELIMIT, you
// still need to call OnResponded.
virtual void OnResponded(int error_code, int64_t latency_us) = 0;
virtual ConcurrencyLimiter* New() const = 0;
virtual int Expose(const butil::StringPiece& prefix) = 0;
// Returns the current maximum concurrency. Note that the maximum
// concurrency of some ConcurrencyLimiters(eg: `auto', `gradient')
// is dynamically changing.
virtual int MaxConcurrency() const = 0;
virtual ~ConcurrencyLimiter() {}
// Returns the reference of maximum concurrency. mainly used to explicitly
// specify the maximum concurrency. This method can only be called before
// the server starts.
// NOTE: When using automatic concurrency limiter(eg: `auto', `gradient'),
// the specified maximum concurrency will NOT take effect.
virtual int& MaxConcurrencyRef() = 0;
virtual int MaxConcurrency() const = 0;
// Expose internal vars. NOT thread-safe.
// Return 0 on success, -1 otherwise.
virtual int Expose(const butil::StringPiece& prefix) = 0;
virtual int& MaxConcurrency() = 0;
// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual ConcurrencyLimiter* New() const = 0;
virtual int CurrentMaxConcurrency() const = 0;
virtual ~ConcurrencyLimiter() {}
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
......
......@@ -31,20 +31,10 @@ MethodStatus::MethodStatus()
, _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nrefused_per_second(&_nrefused_bvar, 1)
, _nprocessing(0) {
const ConcurrencyLimiter* cl
= ConcurrencyLimiterExtension()->Find("constant");
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrentLimiter by `constant`";
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
}
MethodStatus::~MethodStatus() {
if (_cl) {
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
......@@ -63,7 +53,7 @@ int MethodStatus::Expose(const butil::StringPiece& prefix) {
if (_latency_rec.expose(prefix) != 0) {
return -1;
}
if (_cl->Expose(prefix) != 0) {
if (NULL != _cl && _cl->Expose(prefix) != 0) {
return -1;
}
return 0;
......
......@@ -53,18 +53,35 @@ public:
// Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const;
int current_max_concurrency() const {
return _cl->CurrentMaxConcurrency();
int max_concurrency() const {
if (NULL == _cl) {
return 0;
} else {
return _cl->MaxConcurrency();
}
}
int max_concurrency() const {
return const_cast<const ConcurrencyLimiter*>(_cl)->MaxConcurrency();
// Note: This method is not thread safe and can only be called before
// the server is started.
int& max_concurrency_ref() {
if (NULL == _cl) {
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find("constant");
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrentLimiter by `constant`";
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
}
return _cl->MaxConcurrencyRef();
}
int& max_concurrency() { return _cl->MaxConcurrency(); }
void ResetConcurrencyLimiter(ConcurrencyLimiter* cl) {
if (_cl) {
void SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
if (NULL != _cl) {
_cl->Destroy();
}
_cl = cl;
......@@ -105,7 +122,7 @@ private:
inline bool MethodStatus::OnRequested() {
_nprocessing.fetch_add(1, butil::memory_order_relaxed);
if (_cl->OnRequested()) {
if (NULL == _cl || _cl->OnRequested()) {
return true;
}
_nrefused_bvar << 1;
......@@ -119,7 +136,9 @@ inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
} else {
_nerror << 1;
}
_cl->OnResponded(error_code, latency);
if (NULL != _cl) {
_cl->OnResponded(error_code, latency);
}
}
} // namespace brpc
......
......@@ -41,11 +41,16 @@ public:
// Returns true if the `max_concurrency' limit is not reached.
bool AddConcurrency(Controller* c) {
c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return _server->_cl->OnRequested();
if (NULL != _server->_cl) {
return _server->_cl->OnRequested();
} else {
return true;
}
}
void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY) &&
NULL != _server->_cl) {
_server->_cl->OnResponded(c->ErrorCode(), c->latency_us());
}
}
......
......@@ -439,7 +439,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->current_max_concurrency());
method_status->max_concurrency());
break;
}
}
......
......@@ -32,15 +32,11 @@ void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
int ConstantConcurrencyLimiter::CurrentMaxConcurrency() const {
return _max_concurrency;
}
int ConstantConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency;
}
int& ConstantConcurrencyLimiter::MaxConcurrency() {
int& ConstantConcurrencyLimiter::MaxConcurrencyRef() {
return _max_concurrency;
}
......
......@@ -32,9 +32,8 @@ public:
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int CurrentMaxConcurrency() const override;
int MaxConcurrency() const override;
int& MaxConcurrency() override;
int& MaxConcurrencyRef() override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
......
......@@ -86,19 +86,11 @@ void GradientConcurrencyLimiter::Describe(
os << '}';
}
int GradientConcurrencyLimiter::CurrentMaxConcurrency() const {
return _max_concurrency.load(butil::memory_order_relaxed);
}
int GradientConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency.load(butil::memory_order_relaxed);
}
int& GradientConcurrencyLimiter::MaxConcurrency() {
_unused_max_concurrency
= _max_concurrency.load(butil::memory_order_relaxed);
LOG_EVERY_N(WARNING, 1000)
<< "Try to modify max concurrency in automatic limiter(gradient)";
int& GradientConcurrencyLimiter::MaxConcurrencyRef() {
return _unused_max_concurrency;
}
......
......@@ -30,17 +30,8 @@ public:
~GradientConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int CurrentMaxConcurrency() const override;
int MaxConcurrency() const override;
// For compatibility with the MaxConcurrencyOf() interface. When using
// an automatic concurrency adjustment strategy, you should not manually
// adjust the maximum concurrency. In spite of this, calling this
// interface is safe and it can normally return the current maximum
// concurrency. But your changes to the maximum concurrency will not take
// effect.
int& MaxConcurrency() override;
int& MaxConcurrencyRef() override;
int Expose(const butil::StringPiece& prefix) override;
GradientConcurrencyLimiter* New() const override;
......
......@@ -456,7 +456,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->current_max_concurrency());
method_status->max_concurrency());
break;
}
}
......
......@@ -246,7 +246,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->current_max_concurrency());
method_status->max_concurrency());
break;
}
}
......
......@@ -412,7 +412,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->current_max_concurrency());
method_status->max_concurrency());
break;
}
}
......
......@@ -619,17 +619,6 @@ int Server::InitializeOnce() {
LOG(ERROR) << "Fail to init _ssl_ctx_map";
return -1;
}
const ConcurrencyLimiter* cl
= ConcurrencyLimiterExtension()->Find("constant");
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrentLimiter by `constant`";
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
_status = READY;
return 0;
}
......@@ -886,35 +875,38 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads);
}
// Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// unless you have set a constant maximum concurrency for this method
// before starting the server.
if (_options.max_concurrency == "constant") {
_cl->MaxConcurrency() = _options.max_concurrency;
} else {
_cl->MaxConcurrency() = 0;
if (_options.max_concurrency == "constant" &&
static_cast<int>(_options.max_concurrency) != 0) {
const ConcurrencyLimiter* constant_cl =
ConcurrencyLimiterExtension()->Find("constant");
if (NULL == constant_cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `constant'";
}
ConcurrencyLimiter* cl_copy = constant_cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
_cl->MaxConcurrencyRef() = _options.max_concurrency;
} else if (_options.max_concurrency != "constant") {
const ConcurrencyLimiter* cl = NULL;
cl = ConcurrencyLimiterExtension()->Find(
_options.max_concurrency.name().c_str());
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
continue;
}
const ConcurrencyLimiter* cl = NULL;
cl = ConcurrencyLimiterExtension()->Find(
_options.max_concurrency.name().c_str());
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
it->second.status->ResetConcurrencyLimiter(cl_copy);
it->second.status->SetConcurrencyLimiter(cl_copy);
}
}
......@@ -2007,32 +1999,29 @@ bool Server::ClearCertMapping(CertMaps& bg) {
}
int Server::ResetMaxConcurrency(int max_concurrency) {
if (!IsRunning()) {
LOG(WARNING) << "ResetMaxConcurrency is only allowd for a Running Server";
return -1;
}
if (_options.max_concurrency != "constant") {
LOG(WARNING)
<< "ResetMaxConcurrency is only allowed for "
"constant concurrency limiter";
return -1;
} else {
_cl->MaxConcurrency() = max_concurrency;
}
LOG(WARNING) << "ResetMaxConcurrency is already deprecated";
return 0;
}
int& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return mp->status->max_concurrency();
return mp->status->max_concurrency_ref();
}
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp == NULL || mp->status == NULL) {
return 0;
}
......
......@@ -119,7 +119,7 @@ struct ServerOptions {
// NOTE: Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// even if you have set a maximum concurrency through `SetMaxConcurrencyOf`.
// even if you have set a maximum concurrency through `MaxConcurrencyOf`.
AdaptiveMaxConcurrency max_concurrency;
......@@ -484,15 +484,7 @@ public:
void PrintTabsBody(std::ostream& os, const char* current_tab_name) const;
// Reset the max_concurrency set by ServerOptions.max_concurrency after
// Server is started.
// The concurrency will be limited by the new value if this function is
// successfully returned.
// Note: You may call this interface ONLY if you use the CONSTANT
// maximum concurrency, like `options.max_concurrency = 1000`. If you
// have chosen another maximum concurrency limit policy,
// eg: `options.max_concurrency = "auto"`, it will directly return -1.
// Returns 0 on success, -1 otherwise.
// This method is already deprecated.You should NOT call it anymore.
int ResetMaxConcurrency(int max_concurrency);
// Get/set max_concurrency associated with a method.
......@@ -500,11 +492,10 @@ public:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
// or server.MaxConcurrencyOf("example.EchoService", "Echo") = 10;
// or server.MaxConcurrencyOf(&service, "Echo") = 10;
// or server.MaxConcurrencyOf(&service, "Echo") = "auto";
// Note: You should NOT set the max_concurrency when you have choosen an
// auto concurrency limiter, eg `options.max_concurrency = "auto"`.If you
// still called non-const version of the interface, it would return the
// method's current maximum concurrency correctly. But your changes to the
// Note: These interfaces can ONLY be called before the server is started.
// And you should NOT set the max_concurrency when you are going to choose
// an auto concurrency limiter, eg `options.max_concurrency = "auto"`.If you
// still called non-const version of the interface, your changes to the
// maximum concurrency will not take effect.
int& MaxConcurrencyOf(const butil::StringPiece& full_method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_method_name) const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment