Commit e69935d8 authored by helei's avatar helei Committed by TousakaRin

auto max_concurrency limiter

parent d84ba761
......@@ -39,7 +39,7 @@ ConnectionType StringToConnectionType(const butil::StringPiece& type,
}
LOG_IF(ERROR, print_log_on_unknown && !type.empty())
<< "Unknown connection_type `" << type
<< "', supported types: single pooled short";
<< "`, supported types: single pooled short";
return CONNECTION_TYPE_UNKNOWN;
}
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include <cstring>
#include <strings.h>
#include "butil/logging.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/adaptive_max_concurrency.h"
namespace brpc {
inline bool CompareStringPieceWithoutCase(
const butil::StringPiece& s1, const char* s2) {
DCHECK(s2 != NULL);
if (std::strlen(s2) != s1.size()) {
return false;
}
return ::strncasecmp(s1.data(), s2, s1.size()) == 0;
}
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(const butil::StringPiece& name) {
if (butil::StringToInt(name, &_max_concurrency) && _max_concurrency >= 0) {
_name = "constant";
} else if (_max_concurrency < 0) {
LOG(FATAL) << "Invalid max_concurrency: " << name;
} else {
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
}
}
void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& name) {
int max_concurrency = 0;
if (butil::StringToInt(name, &max_concurrency) && max_concurrency >= 0) {
_name = "constant";
_max_concurrency = max_concurrency;
} else if (max_concurrency < 0) {
LOG(ERROR) << "Fail to set max_concurrency, invalid value:" << name;
} else if (CompareStringPieceWithoutCase(name, "constant")) {
LOG(WARNING)
<< "If you want to use a constant maximum concurrency, assign "
<< "an integer value directly to ServerOptions.max_concurrency "
<< "like: `server_options.max_concurrency = 1000`";
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
} else {
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
}
}
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency) {
return CompareStringPieceWithoutCase(concurrency,
adaptive_concurrency.name().c_str());
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_ADAPTIVE_MAX_CONCURRENCY_H
#define BRPC_ADAPTIVE_MAX_CONCURRENCY_H
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
#include "butil/strings/string_piece.h"
#include "brpc/options.pb.h"
namespace brpc {
class AdaptiveMaxConcurrency{
public:
AdaptiveMaxConcurrency()
: _name("constant")
, _max_concurrency(0) {}
AdaptiveMaxConcurrency(int max_concurrency)
: _name("constant")
, _max_concurrency(max_concurrency) {}
AdaptiveMaxConcurrency(const butil::StringPiece& name);
void operator=(int max_concurrency) {
_name = "constant";
_max_concurrency = max_concurrency;
}
void operator=(const butil::StringPiece& name);
operator int() const { return _max_concurrency; }
const std::string& name() const { return _name; }
private:
std::string _name;
int _max_concurrency;
};
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency);
inline bool operator==(const butil::StringPiece& concurrency,
const AdaptiveMaxConcurrency& adaptive_concurrency) {
return adaptive_concurrency == concurrency;
}
inline bool operator!=(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency) {
return !(adaptive_concurrency == concurrency);
}
inline bool operator!=(const butil::StringPiece& concurrency,
const AdaptiveMaxConcurrency& adaptive_concurrency) {
return !(adaptive_concurrency == concurrency);
}
} // namespace brpc
#endif // BRPC_ADAPTIVE_MAX_CONCURRENCY_H
......@@ -95,9 +95,12 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
<< "_connection_count\" class=\"flot-placeholder\"></div></div>";
}
os << '\n';
const int max_concurrency = server->options().max_concurrency;
if (max_concurrency > 0) {
os << "max_concurrency: " << max_concurrency << '\n';
const AdaptiveMaxConcurrency max_concurrency =
server->options().max_concurrency;
if (max_concurrency == "constant") {
os << "max_concurrency: " << static_cast<int>(max_concurrency) << '\n';
} else {
os << "concurrency limiter: " << max_concurrency.name() << '\n';
}
os << '\n';
......@@ -155,7 +158,8 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
if (mp->status && mp->status->max_concurrency() > 0) {
const MethodStatus* mp_status = mp->status;
if (NULL != mp_status && mp_status->max_concurrency() > 0) {
os << " max_concurrency=" << mp->status->max_concurrency();
}
}
......@@ -167,8 +171,9 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
if (mp->http_url) {
os << " @" << *mp->http_url;
}
if (mp->status && mp->status->max_concurrency() > 0) {
os << " max_concurrency=" << mp->status->max_concurrency();
const MethodStatus* mp_status = mp->status;
if (NULL != mp_status && mp_status->max_concurrency() > 0) {
os << " max_concurrency=" << mp_status->max_concurrency();
}
}
os << '\n';
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_CONCURRENCY_LIMITER_H
#define BRPC_CONCURRENCY_LIMITER_H
#include "bvar/passive_status.h"
#include "brpc/describable.h"
#include "brpc/destroyable.h"
#include "brpc/adaptive_max_concurrency.h"
#include "brpc/extension.h" // Extension<T>
namespace brpc {
class ConcurrencyLimiter : public NonConstDescribable, public Destroyable {
public:
ConcurrencyLimiter() {}
virtual bool OnRequested() = 0;
virtual void OnResponded(int error_code, int64_t latency) = 0;
virtual ConcurrencyLimiter* New() const = 0;
virtual int Expose(const butil::StringPiece& prefix) = 0;
virtual ~ConcurrencyLimiter() {}
virtual int MaxConcurrency() const = 0;
virtual int& MaxConcurrency() = 0;
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
return Extension<const ConcurrencyLimiter>::instance();
}
} // namespace brpc
#endif // BRPC_CONCURRENCY_LIMITER_H
......@@ -17,6 +17,7 @@
#include <limits>
#include "butil/macros.h"
#include "brpc/details/method_status.h"
#include "brpc/controller.h"
namespace brpc {
......@@ -25,24 +26,34 @@ static int cast_nprocessing(void* arg) {
}
MethodStatus::MethodStatus()
: _max_concurrency(0)
: _cl(NULL)
, _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nprocessing(0) {
}
, _nrefused_per_second(&_nrefused_bvar, 1)
, _nprocessing(0) {}
MethodStatus::~MethodStatus() {
if (_cl) {
_cl->Destroy();
_cl = NULL;
}
}
int MethodStatus::Expose(const butil::StringPiece& prefix) {
if (_nprocessing_bvar.expose_as(prefix, "processing") != 0) {
return -1;
}
if (_nrefused_per_second.expose_as(prefix, "refused_per_second") != 0) {
return -1;
}
if (_nerror.expose_as(prefix, "error") != 0) {
return -1;
}
if (_latency_rec.expose(prefix) != 0) {
return -1;
}
if (_cl->Expose(prefix) != 0) {
return -1;
}
return 0;
}
......@@ -114,4 +125,11 @@ void MethodStatus::Describe(
_nprocessing, options, false);
}
ScopedMethodStatus::~ScopedMethodStatus() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _start_parse_us);
_status = NULL;
}
}
} // namespace brpc
......@@ -20,10 +20,13 @@
#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bvar/bvar.h" // vars
#include "brpc/describable.h"
#include "brpc/concurrency_limiter.h"
namespace brpc {
class Server;
class Controller;
// Record accessing stats of a method.
class MethodStatus : public Describable {
public:
......@@ -36,12 +39,12 @@ public:
bool OnRequested();
// Call this when the method just finished.
// `success' : successful call or not.
// `error_code' : The error code obtained from the controller. Equal to
// 0 when the call is successful.
// `latency_us' : microseconds taken by a successful call. Latency can
// be measured in this utility class as well, but the callsite often
// did the time keeping and the cost is better saved. If `success' is
// false, `latency_us' is not used.
void OnResponded(bool success, int64_t latency_us);
// did the time keeping and the cost is better saved.
void OnResponded(int error_code, int64_t latency_us);
// Expose internal vars.
// Return 0 on success, -1 otherwise.
......@@ -50,18 +53,24 @@ public:
// Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const;
int max_concurrency() const { return _max_concurrency; }
int& max_concurrency() { return _max_concurrency; }
int max_concurrency() const {
return const_cast<const ConcurrencyLimiter*>(_cl)->MaxConcurrency();
}
int& max_concurrency() { return _cl->MaxConcurrency(); }
private:
friend class ScopedMethodStatus;
friend class Server;
DISALLOW_COPY_AND_ASSIGN(MethodStatus);
void OnError();
int _max_concurrency;
bvar::Adder<int64_t> _nerror;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
ConcurrencyLimiter* _cl;
bvar::Adder<int64_t> _nerror;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
bvar::Adder<uint32_t> _nrefused_bvar;
bvar::Window<bvar::Adder<uint32_t>> _nrefused_per_second;
butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
};
......@@ -69,13 +78,12 @@ friend class ScopedMethodStatus;
// an error will be counted.
class ScopedMethodStatus {
public:
ScopedMethodStatus(MethodStatus* status) : _status(status) {}
~ScopedMethodStatus() {
if (_status) {
_status->OnError();
_status = NULL;
}
}
ScopedMethodStatus(MethodStatus* status, Controller* c,
int64_t start_parse_us)
: _status(status)
, _c(c)
, _start_parse_us(start_parse_us) {}
~ScopedMethodStatus();
MethodStatus* release() {
MethodStatus* tmp = _status;
_status = NULL;
......@@ -85,22 +93,27 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus);
MethodStatus* _status;
Controller* _c;
uint64_t _start_parse_us;
};
inline bool MethodStatus::OnRequested() {
const int last_nproc = _nprocessing.fetch_add(1, butil::memory_order_relaxed);
// _max_concurrency may be changed by user at any time.
const int saved_max_concurrency = _max_concurrency;
return (saved_max_concurrency <= 0 || last_nproc < saved_max_concurrency);
_nprocessing.fetch_add(1, butil::memory_order_relaxed);
bool should_refuse = !_cl->OnRequested();
if (should_refuse) {
_nrefused_bvar << 1;
}
return !should_refuse;
}
inline void MethodStatus::OnResponded(bool success, int64_t latency) {
if (success) {
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
if (error_code == 0) {
_latency_rec << latency;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
} else {
OnError();
}
_cl->OnResponded(error_code, latency);
}
inline void MethodStatus::OnError() {
......
......@@ -38,25 +38,15 @@ public:
_server->_nerror << 1;
}
// Returns true iff the `max_concurrency' limit is not reached.
// Returns true if the `max_concurrency' limit is not reached.
bool AddConcurrency(Controller* c) {
if (_server->options().max_concurrency <= 0) {
return true;
}
if (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
<= _server->options().max_concurrency) {
c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return true;
}
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
return false;
c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return _server->_cl->OnRequested();
}
// Remove the increment of AddConcurrency(). Must not be called when
// AddConcurrency() returned false.
void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
_server->_cl->OnResponded(c->ErrorCode(), c->latency_us());
}
}
......
......@@ -65,6 +65,11 @@
# include "brpc/policy/thrift_protocol.h"
#endif
// Concurrency Limiters
#include "brpc/concurrency_limiter.h"
#include "brpc/policy/gradient_concurrency_limiter.h"
#include "brpc/policy/constant_concurrency_limiter.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList
#include "brpc/server.h"
......@@ -99,6 +104,7 @@ using namespace policy;
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
void __attribute__((weak)) RegisterThriftProtocol();
struct GlobalExtensions {
GlobalExtensions()
......@@ -120,6 +126,9 @@ struct GlobalExtensions {
ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb;
DynPartLoadBalancer dynpart_lb;
GradientConcurrencyLimiter gradient_cl;
ConstantConcurrencyLimiter constant_cl;
};
static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
......@@ -550,6 +559,11 @@ static void GlobalInitializeOrDieImpl() {
}
}
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->gradient_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("gradient", &g_ext->gradient_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after
......
......@@ -146,7 +146,7 @@ void SendRpcResponse(int64_t correlation_id,
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(method_status_raw);
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -266,7 +266,7 @@ void SendRpcResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -395,8 +395,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency));
break;
}
......@@ -442,7 +443,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
const_cast<const MethodStatus*>(method_status)->max_concurrency());
break;
}
}
......@@ -514,7 +515,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(), cntl.release(),
req.release(), res.release(), server,
<<<<<<< HEAD
method_status, msg->received_us());
=======
method_status, start_parse_us);
>>>>>>> auto max_concurrency limiter
}
bool VerifyRpcRequest(const InputMessageBase* msg_base) {
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include "brpc/policy/constant_concurrency_limiter.h"
namespace brpc {
namespace policy {
bool ConstantConcurrencyLimiter::OnRequested() {
const int32_t current_concurreny =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (_max_concurrency != 0 && current_concurreny >= _max_concurrency) {
return false;
}
return true;
}
void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
int ConstantConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency;
}
int& ConstantConcurrencyLimiter::MaxConcurrency() {
return _max_concurrency;
}
int ConstantConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
return 0;
}
ConstantConcurrencyLimiter* ConstantConcurrencyLimiter::New() const {
return new (std::nothrow) ConstantConcurrencyLimiter;
}
void ConstantConcurrencyLimiter::Destroy() {
delete this;
}
void ConstantConcurrencyLimiter::Describe(
std::ostream& os, const DescribeOptions& options) {
if (!options.verbose) {
os << "constant_cl";
return;
}
os << "Constant{";
os << "current_max_concurrency:"
<< _max_concurrency;
os << '}';
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
ConstantConcurrencyLimiter()
: _max_concurrency(0),
_current_concurrency(0) {}
~ConstantConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
virtual int MaxConcurrency() const override;
virtual int& MaxConcurrency() override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
void Destroy() override;
void Describe(std::ostream&, const DescribeOptions& options) override;
private:
int32_t _max_concurrency;
butil::atomic<int32_t> _current_concurrency;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
This diff is collapsed.
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
class GradientConcurrencyLimiter : public ConcurrencyLimiter {
public:
GradientConcurrencyLimiter();
~GradientConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int MaxConcurrency() const override;
// For compatibility with the MaxConcurrencyOf() interface. When using
// an automatic concurrency adjustment strategy, you should not manually
// adjust the maximum concurrency. In spite of this, calling this
// interface is safe and it can normally return the current maximum
// concurrency. But your changes to the maximum concurrency will not take
// effect.
int& MaxConcurrency() override;
int Expose(const butil::StringPiece& prefix) override;
GradientConcurrencyLimiter* New() const override;
void Destroy() override;
void Describe(std::ostream&, const DescribeOptions& options) override;
private:
struct SampleWindow {
SampleWindow()
: start_time_us(0)
, succ_count(0)
, failed_count(0)
, min_latency_us(-1)
, total_failed_us(0)
, total_succ_us(0) {}
int64_t start_time_us;
int32_t succ_count;
int32_t failed_count;
int64_t min_latency_us;
int64_t total_failed_us;
int64_t total_succ_us;
};
struct WindowSnap {
WindowSnap(int64_t latency_us, int32_t concurrency)
: avg_latency_us(latency_us)
, actuall_concurrency(concurrency) {}
int64_t avg_latency_us;
int32_t actuall_concurrency;
};
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
//NOT thread-safe, should be called in AddSample()
void UpdateConcurrency();
void ResetSampleWindow(int64_t sampling_time_us);
SampleWindow _sw;
std::vector<WindowSnap> _ws_queue;
uint32_t _ws_index;
int32_t _unused_max_concurrency;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _max_concurrency;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _current_concurrency;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
......@@ -557,7 +557,7 @@ static void SendHttpResponse(Controller *cntl,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw);
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -729,7 +729,7 @@ static void SendHttpResponse(Controller *cntl,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -1172,10 +1172,19 @@ void ProcessHttpRequest(InputMessageBase *msg) {
MethodStatus* method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
<<<<<<< HEAD
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
=======
cntl->SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status);
>>>>>>> auto max_concurrency limiter
}
}
......@@ -1191,9 +1200,16 @@ void ProcessHttpRequest(InputMessageBase *msg) {
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (!server_accessor.AddConcurrency(cntl.get())) {
<<<<<<< HEAD
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
=======
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>((server->options().max_concurrency)));
return SendHttpResponse(cntl.release(), server, method_status);
>>>>>>> auto max_concurrency limiter
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
......
......@@ -231,7 +231,7 @@ static void SendHuluResponse(int64_t correlation_id,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw);
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
......@@ -320,7 +320,7 @@ static void SendHuluResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -429,7 +429,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
static_cast<int>(server->options().max_concurrency));
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -460,7 +460,8 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
break;
}
}
......
......@@ -60,7 +60,7 @@ SendMongoResponse::~SendMongoResponse() {
void SendMongoResponse::Run() {
std::unique_ptr<SendMongoResponse> delete_self(this);
ScopedMethodStatus method_status(status);
ScopedMethodStatus method_status(status, &cntl, butil::cpuwide_time_us());
Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket();
if (cntl.IsCloseConnection()) {
......@@ -104,7 +104,7 @@ void SendMongoResponse::Run() {
}
if (method_status) {
method_status.release()->OnResponded(
!cntl.Failed(), butil::cpuwide_time_us() - received_us);
cntl.ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -224,8 +224,9 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
}
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
mongo_done->cntl.SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency));
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -248,7 +249,8 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
break;
}
}
......
......@@ -72,7 +72,8 @@ void NsheadClosure::Run() {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(_server->options().nshead_service->_status);
ScopedMethodStatus method_status(_server->options().nshead_service->_status,
&_controller, butil::cpuwide_time_us());
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......@@ -125,7 +126,7 @@ void NsheadClosure::Run() {
}
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - _received_us);
_controller.ErrorCode(), butil::cpuwide_time_us() - _received_us);
}
}
......@@ -296,8 +297,9 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
break;
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency));
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -215,7 +215,7 @@ static void SendSofaResponse(int64_t correlation_id,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw);
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
......@@ -296,7 +296,7 @@ static void SendSofaResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -391,8 +391,9 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>(server->options().max_concurrency));
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -415,7 +416,8 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
const_cast<MethodStatus*>(
method_status)->max_concurrency());
break;
}
}
......
......@@ -425,6 +425,10 @@ Server::~Server() {
delete _options.auth;
_options.auth = NULL;
}
if (_cl) {
_cl->Destroy();
_cl = NULL;
}
}
int Server::AddBuiltinServices() {
......@@ -662,6 +666,8 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}
static int g_default_max_concurrency_of_method = 0;
int Server::StartInternal(const butil::ip_t& ip,
const PortRange& port_range,
const ServerOptions *opt) {
......@@ -832,8 +838,6 @@ int Server::StartInternal(const butil::ip_t& ip,
}
}
_concurrency = 0;
if (_options.has_builtin_services &&
_builtin_service_count <= 0 &&
AddBuiltinServices() != 0) {
......@@ -870,6 +874,50 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads);
}
// Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// unless you have set a constant maximum concurrency for this method
// before starting the server.
const ConcurrencyLimiter* cl
= ConcurrencyLimiterExtension()->Find("constant");
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrentLimiter by `constant`";
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to new ConcurrencyLimiter";
}
_cl = cl_copy;
if (_options.max_concurrency == "constant") {
_cl->MaxConcurrency() = _options.max_concurrency;
} else {
_cl->MaxConcurrency() = 0;
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (NULL != it->second.status->_cl) {
continue;
}
const ConcurrencyLimiter* cl = NULL;
const std::string cl_name = it->second.is_builtin_service ?
"constant" : _options.max_concurrency.name();
cl = ConcurrencyLimiterExtension()->Find(cl_name.c_str());
if (NULL == cl) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
}
ConcurrencyLimiter* cl_copy = cl->New();
if (NULL == cl_copy) {
LOG(FATAL) << "Fail to find ConcurrencyLimiter by `"
<< _options.max_concurrency.name() << '`';
return -1;
}
it->second.status->_cl = cl_copy;
}
// Create listening ports
if (port_range.min_port > port_range.max_port) {
LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
......@@ -1963,13 +2011,17 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
LOG(WARNING) << "ResetMaxConcurrency is only allowd for a Running Server";
return -1;
}
// Assume that modifying int32 is atomical in X86
_options.max_concurrency = max_concurrency;
if (_options.max_concurrency != "constant") {
LOG(WARNING)
<< "ResetMaxConcurrency is only allowed for "
"constant concurrency limiter";
return -1;
} else {
_cl->MaxConcurrency() = max_concurrency;
}
return 0;
}
static int g_default_max_concurrency_of_method = 0;
int& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
......@@ -1984,7 +2036,8 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (mp == NULL || mp->status == NULL) {
return 0;
}
return mp->status->max_concurrency();
const MethodStatus* mp_status = mp->status;
return mp_status->max_concurrency();
}
int& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
......@@ -2072,4 +2125,4 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
}
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
} // namespace brpc
} // namespace brpc
......@@ -36,6 +36,7 @@
#include "brpc/builtin/tabbed.h"
#include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h"
#include "brpc/concurrency_limiter.h"
extern "C" {
struct ssl_ctx_st;
......@@ -114,7 +115,13 @@ struct ServerOptions {
// shall try another server.
// NOTE: accesses to builtin services are not limited by this option.
// Default: 0 (unlimited)
int max_concurrency;
// NOTE: Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// unless you have set a maximum concurrency for this method before starting
// the server.
AdaptiveMaxConcurrency max_concurrency;
// -------------------------------------------------------
// Differences between session-local and thread-local data
......@@ -476,10 +483,15 @@ public:
// current_tab_name is the tab highlighted.
void PrintTabsBody(std::ostream& os, const char* current_tab_name) const;
// Reset the max_concurrency set by ServerOptions.max_concurrency after
// Server is started.
// Server is started.
// The concurrency will be limited by the new value if this function is
// successfully returned.
// Note: You may call this interface ONLY if you use the CONSTANT
// maximum concurrency, like `options.max_concurrency = 1000`. If you
// have chosen another maximum concurrency limit policy,
// eg: `options.max_concurrency = "auto"`, it will directly return -1.
// Returns 0 on success, -1 otherwise.
int ResetMaxConcurrency(int max_concurrency);
......@@ -488,6 +500,12 @@ public:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
// or server.MaxConcurrencyOf("example.EchoService", "Echo") = 10;
// or server.MaxConcurrencyOf(&service, "Echo") = 10;
// or server.MaxConcurrencyOf(&service, "Echo") = "auto";
// Note: You should NOT set the max_concurrency when you have choosen an
// auto concurrency limiter, eg `options.max_concurrency = "auto"`.If you
// still called non-const version of the interface, it would return the
// method's current maximum concurrency correctly. But your changes to the
// maximum concurrency will not take effect.
int& MaxConcurrencyOf(const butil::StringPiece& full_method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_method_name) const;
......@@ -650,6 +668,7 @@ friend class Controller;
// Replace `ServerPrivateAccessor' with other private-access
// mechanism
mutable bvar::Adder<int64_t> _nerror;
ConcurrencyLimiter* _cl;
mutable int32_t BAIDU_CACHELINE_ALIGNMENT _concurrency;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment