Commit 684d7e57 authored by Ge Jun's avatar Ge Jun

revert code on server-level mc & add default value for method-level mc

parent 89bbc2c3
...@@ -35,22 +35,22 @@ public: ...@@ -35,22 +35,22 @@ public:
} }
void AddError() { void AddError() {
_server->_nerror << 1; _server->_nerror_bvar << 1;
} }
// Returns true if the `max_concurrency' limit is not reached. // Returns true if the `max_concurrency' limit is not reached.
bool AddConcurrency(Controller* c) { bool AddConcurrency(Controller* c) {
if (NULL != _server->_cl) { if (_server->options().max_concurrency <= 0) {
c->add_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED); return true;
return _server->_cl->OnRequested(); }
} c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return true; return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
<= _server->options().max_concurrency);
} }
void RemoveConcurrency(const Controller* c) { void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED)){ if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
CHECK(_server->_cl != NULL); butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
_server->_cl->OnResponded(c->ErrorCode(), c->latency_us());
} }
} }
......
...@@ -272,7 +272,8 @@ void* Server::UpdateDerivedVars(void* arg) { ...@@ -272,7 +272,8 @@ void* Server::UpdateDerivedVars(void* arg) {
std::vector<SocketId> conns; std::vector<SocketId> conns;
std::vector<SocketId> internal_conns; std::vector<SocketId> internal_conns;
server->_nerror.expose_as(prefix, "error"); server->_nerror_bvar.expose_as(prefix, "error");
//server->_nrefused_bvar.expose_as(prefix, "refused");
bvar::PassiveStatus<timeval> uptime_st( bvar::PassiveStatus<timeval> uptime_st(
prefix, "uptime", GetUptime, (void*)(intptr_t)start_us); prefix, "uptime", GetUptime, (void*)(intptr_t)start_us);
...@@ -382,7 +383,9 @@ Server::Server(ProfilerLinker) ...@@ -382,7 +383,9 @@ Server::Server(ProfilerLinker)
, _last_start_time(0) , _last_start_time(0)
, _derivative_thread(INVALID_BTHREAD) , _derivative_thread(INVALID_BTHREAD)
, _keytable_pool(NULL) , _keytable_pool(NULL)
, _cl(NULL) { , _concurrency(0) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
Server_concurrency_must_be_aligned_by_cacheline);
} }
Server::~Server() { Server::~Server() {
...@@ -423,10 +426,6 @@ Server::~Server() { ...@@ -423,10 +426,6 @@ Server::~Server() {
delete _options.auth; delete _options.auth;
_options.auth = NULL; _options.auth = NULL;
} }
if (_cl) {
_cl->Destroy();
_cl = NULL;
}
} }
int Server::AddBuiltinServices() { int Server::AddBuiltinServices() {
...@@ -664,6 +663,27 @@ static int get_port_from_fd(int fd) { ...@@ -664,6 +663,27 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port); return ntohs(addr.sin_port);
} }
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(amc.type().c_str());
if (cl == NULL) {
LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
return false;
}
ConcurrencyLimiter* cl_copy = cl->New(amc);
if (cl_copy == NULL) {
LOG(ERROR) << "Fail to new ConcurrencyLimiter";
return false;
}
*out = cl_copy;
return true;
}
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method = 0; static AdaptiveMaxConcurrency g_default_max_concurrency_of_method = 0;
int Server::StartInternal(const butil::ip_t& ip, int Server::StartInternal(const butil::ip_t& ip,
...@@ -835,6 +855,8 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -835,6 +855,8 @@ int Server::StartInternal(const butil::ip_t& ip,
} }
} }
} }
_concurrency = 0;
if (_options.has_builtin_services && if (_options.has_builtin_services &&
_builtin_service_count <= 0 && _builtin_service_count <= 0 &&
...@@ -872,28 +894,21 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -872,28 +894,21 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads); bthread_setconcurrency(_options.num_threads);
} }
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
if (_options.max_concurrency != "constant" ||
static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency);
_cl->Expose("Server_Concurrency_Limiter");
}
for (MethodMap::iterator it = _method_map.begin(); for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) { it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) { if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL); it->second.status->SetConcurrencyLimiter(NULL);
} else if (it->second.max_concurrency == "constant" &&
static_cast<int>(it->second.max_concurrency) == 0) {
it->second.status->SetConcurrencyLimiter(NULL);
} else { } else {
it->second.status->SetConcurrencyLimiter( const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
ConcurrencyLimiter::CreateConcurrencyLimiterOrDie( if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
it->second.max_concurrency)); amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
} }
} }
...@@ -1986,16 +2001,13 @@ bool Server::ClearCertMapping(CertMaps& bg) { ...@@ -1986,16 +2001,13 @@ bool Server::ClearCertMapping(CertMaps& bg) {
} }
int Server::ResetMaxConcurrency(int max_concurrency) { int Server::ResetMaxConcurrency(int max_concurrency) {
LOG(WARNING) << "ResetMaxConcurrency is already deprecated"; if (!IsRunning()) {
return 0; LOG(WARNING) << "ResetMaxConcurrency is only allowd for a Running Server";
} return -1;
int Server::max_concurrency() const {
if (NULL != _cl) {
return _cl->max_concurrency();
} else {
return g_default_max_concurrency_of_method;
} }
// Assume that modifying int32 is atomical in X86
_options.max_concurrency = max_concurrency;
return 0;
} }
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) { AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include "brpc/builtin/tabbed.h" #include "brpc/builtin/tabbed.h"
#include "brpc/details/profiler_linker.h" #include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h" #include "brpc/health_reporter.h"
#include "brpc/concurrency_limiter.h"
#include "brpc/adaptive_max_concurrency.h" #include "brpc/adaptive_max_concurrency.h"
extern "C" { extern "C" {
...@@ -101,14 +100,14 @@ struct ServerOptions { ...@@ -101,14 +100,14 @@ struct ServerOptions {
// Default: #cpu-cores // Default: #cpu-cores
int num_threads; int num_threads;
// Limit number of requests processed in parallel. To limit the max // Server-level max concurrency.
// concurrency of a method, use server.MaxConcurrencyOf("xxx") instead. // "concurrency" = "number of requests processed in parallel"
// //
// In a traditional server, number of pthread workers also limits // In a traditional server, number of pthread workers also limits
// concurrency. However brpc runs requests in bthreads which are // concurrency. However brpc runs requests in bthreads which are
// mapped to pthread workers, when a bthread context switches, it gives // mapped to pthread workers, when a bthread context switches, it gives
// the pthread worker to another bthread, yielding a higher concurrency // the pthread worker to another bthread, yielding a higher concurrency
// than number of pthreads. In some situation, higher concurrency may // than number of pthreads. In some situations, higher concurrency may
// consume more resources, to protect the server from running out of // consume more resources, to protect the server from running out of
// resources, you may set this option. // resources, you may set this option.
// If the server reaches the limitation, it responds client with ELIMIT // If the server reaches the limitation, it responds client with ELIMIT
...@@ -116,12 +115,11 @@ struct ServerOptions { ...@@ -116,12 +115,11 @@ struct ServerOptions {
// shall try another server. // shall try another server.
// NOTE: accesses to builtin services are not limited by this option. // NOTE: accesses to builtin services are not limited by this option.
// Default: 0 (unlimited) // Default: 0 (unlimited)
// NOTE: Once you have chosen the automatic concurrency limit strategy, brpc int max_concurrency;
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency, // Default value of method-level max concurrencies,
// even if you have set a maximum concurrency through `MaxConcurrencyOf`. // Overridable by Server.MaxConcurrencyOf().
AdaptiveMaxConcurrency method_max_concurrency;
AdaptiveMaxConcurrency max_concurrency;
// ------------------------------------------------------- // -------------------------------------------------------
// Differences between session-local and thread-local data // Differences between session-local and thread-local data
...@@ -484,13 +482,9 @@ public: ...@@ -484,13 +482,9 @@ public:
// current_tab_name is the tab highlighted. // current_tab_name is the tab highlighted.
void PrintTabsBody(std::ostream& os, const char* current_tab_name) const; void PrintTabsBody(std::ostream& os, const char* current_tab_name) const;
// This method is already deprecated.You should NOT call it anymore. // This method is already deprecated.You should NOT call it anymore.
int ResetMaxConcurrency(int max_concurrency); int ResetMaxConcurrency(int max_concurrency);
// Server's current max concurrency
int max_concurrency() const;
// Get/set max_concurrency associated with a method. // Get/set max_concurrency associated with a method.
// Example: // Example:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10; // server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
...@@ -659,11 +653,10 @@ friend class Controller; ...@@ -659,11 +653,10 @@ friend class Controller;
bthread_keytable_pool_t* _keytable_pool; bthread_keytable_pool_t* _keytable_pool;
// FIXME: Temporarily for `ServerPrivateAccessor' to change this bvar // mutable is required for `ServerPrivateAccessor' to change this bvar
// Replace `ServerPrivateAccessor' with other private-access mutable bvar::Adder<int64_t> _nerror_bvar;
// mechanism mutable int32_t BAIDU_CACHELINE_ALIGNMENT _concurrency;
mutable bvar::Adder<int64_t> _nerror;
ConcurrencyLimiter* _cl;
}; };
// Get the data attached to current searching thread. The data is created by // Get the data attached to current searching thread. The data is created by
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment