Commit cd79c8c8 authored by Ge Jun's avatar Ge Jun

Small changes around MethodStatus

parent 2cb180f3
......@@ -115,8 +115,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
// Called Server._cl->OnRequested()
static const uint32_t FLAGS_CONCURRENCY_LIMITER_REQUESTED = (1 << 2);
static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2);
static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3);
static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4);
static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5);
......
......@@ -22,39 +22,46 @@
namespace brpc {
static int cast_nprocessing(void* arg) {
static int cast_int(void* arg) {
return *(int*)arg;
}
static int cast_cl(void* arg) {
auto cl = static_cast<std::unique_ptr<ConcurrencyLimiter>*>(arg)->get();
if (cl) {
return cl->MaxConcurrency();
}
return 0;
}
MethodStatus::MethodStatus()
: _cl(NULL)
, _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nrefused_per_second(&_nrefused_bvar, 1)
, _nprocessing(0) {
: _nconcurrency(0)
, _nconcurrency_bvar(cast_int, &_nconcurrency)
, _eps_bvar(&_nerror)
, _max_concurrency_bvar(cast_cl, &_cl)
{
}
MethodStatus::~MethodStatus() {
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
}
int MethodStatus::Expose(const butil::StringPiece& prefix) {
if (_nprocessing_bvar.expose_as(prefix, "processing") != 0) {
if (_nconcurrency_bvar.expose_as(prefix, "concurrency") != 0) {
return -1;
}
if (_nrefused_per_second.expose_as(prefix, "refused_per_second") != 0) {
if (_nerror.expose_as(prefix, "error") != 0) {
return -1;
}
if (_nerror.expose_as(prefix, "error") != 0) {
if (_eps_bvar.expose_as(prefix, "eps") != 0) {
return -1;
}
if (_latency_rec.expose(prefix) != 0) {
return -1;
}
if (NULL != _cl && _cl->Expose(prefix) != 0) {
return -1;
if (_cl) {
if (_max_concurrency_bvar.expose_as(prefix, "max_concurrency") != 0) {
return -1;
}
}
return 0;
}
......@@ -89,19 +96,27 @@ void OutputValue(std::ostream& os,
void MethodStatus::Describe(
std::ostream &os, const DescribeOptions& options) const {
// Sort by alphebetical order to be consistent with /vars.
const int64_t qps = _latency_rec.qps();
const bool expand = (qps != 0);
// success requests
OutputValue(os, "count: ", _latency_rec.count_name(), _latency_rec.count(),
options, false);
const int64_t qps = _latency_rec.qps();
const bool expand = (qps != 0);
OutputValue(os, "qps: ", _latency_rec.qps_name(), _latency_rec.qps(),
options, expand);
// errorous requests
OutputValue(os, "error: ", _nerror.name(), _nerror.get_value(),
options, false);
OutputValue(os, "eps: ", _eps_bvar.name(),
_eps_bvar.get_value(1), options, false);
// latencies
OutputValue(os, "latency: ", _latency_rec.latency_name(),
_latency_rec.latency(), options, false);
if (options.use_html) {
OutputValue(os, "latency_percentiles: ",
_latency_rec.latency_percentiles_name(),
_latency_rec.latency_percentiles(), options, expand);
_latency_rec.latency_percentiles(), options, false);
OutputValue(os, "latency_cdf: ", _latency_rec.latency_cdf_name(),
"click to view", options, expand);
} else {
......@@ -118,23 +133,21 @@ void MethodStatus::Describe(
}
OutputValue(os, "max_latency: ", _latency_rec.max_latency_name(),
_latency_rec.max_latency(), options, false);
OutputValue(os, "qps: ", _latency_rec.qps_name(), _latency_rec.qps(),
options, expand);
// Many people are confusing with the old name "unresponded" which
// contains "un" generally associated with something wrong. Name it
// to "processing" should be more understandable.
OutputValue(os, "processing: ", _nprocessing_bvar.name(),
_nprocessing, options, false);
// Concurrency
OutputValue(os, "concurrency: ", _nconcurrency_bvar.name(),
_nconcurrency, options, false);
if (_cl) {
OutputValue(os, "max_concurrency: ", _max_concurrency_bvar.name(),
MaxConcurrency(), options, false);
}
}
void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
if (NULL != _cl) {
_cl->Destroy();
}
_cl = cl;
_cl.reset(cl);
}
ScopedMethodStatus::~ScopedMethodStatus() {
ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
......
......@@ -34,9 +34,9 @@ public:
~MethodStatus();
// Call this function when the method is about to be called.
// Returns false when the request reaches max_concurrency to the method
// and is suggested to be rejected.
bool OnRequested();
// Returns false when the method is overloaded. If rejected_cc is not
// NULL, it's set with the rejected concurrency.
bool OnRequested(int* rejected_cc = NULL);
// Call this when the method just finished.
// `error_code' : The error code obtained from the controller. Equal to
......@@ -53,18 +53,10 @@ public:
// Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const;
// Current maximum concurrency of method.
// Return 0 if the maximum concurrency is not restricted.
int max_concurrency() const {
if (NULL == _cl) {
return 0;
} else {
return _cl->max_concurrency();
}
}
// Current max_concurrency of the method.
int MaxConcurrency() const { return _cl ? _cl->MaxConcurrency() : 0; }
private:
friend class ScopedMethodStatus;
friend class Server;
DISALLOW_COPY_AND_ASSIGN(MethodStatus);
......@@ -72,43 +64,42 @@ friend class Server;
// before the server is started.
void SetConcurrencyLimiter(ConcurrencyLimiter* cl);
ConcurrencyLimiter* _cl;
std::unique_ptr<ConcurrencyLimiter> _cl;
butil::atomic<int> _nconcurrency;
bvar::Adder<int64_t> _nerror;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
bvar::Adder<uint32_t> _nrefused_bvar;
bvar::Window<bvar::Adder<uint32_t>> _nrefused_per_second;
butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
bvar::PassiveStatus<int> _nconcurrency_bvar;
bvar::PerSecond<bvar::Adder<int64_t>> _eps_bvar;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
};
class ScopedMethodStatus {
class ConcurrencyRemover {
public:
ScopedMethodStatus(MethodStatus* status,
Controller* c,
int64_t received_us)
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
: _status(status)
, _c(c)
, _received_us(received_us) {}
~ScopedMethodStatus();
operator MethodStatus* () const { return _status; }
~ConcurrencyRemover();
private:
DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus);
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
MethodStatus* _status;
Controller* _c;
uint64_t _received_us;
};
inline bool MethodStatus::OnRequested() {
_nprocessing.fetch_add(1, butil::memory_order_relaxed);
if (NULL == _cl || _cl->OnRequested()) {
inline bool MethodStatus::OnRequested(int* rejected_cc) {
const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
if (NULL == _cl || _cl->OnRequested(cc)) {
return true;
}
_nrefused_bvar << 1;
if (rejected_cc) {
*rejected_cc = cc;
}
return false;
}
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
_nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency;
} else {
......
......@@ -107,7 +107,10 @@ const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
, ch_md5_lb(MD5Hash32){}
, ch_md5_lb(MD5Hash32)
, constant_cl(0) {
}
#ifdef BAIDU_INTERNAL
BaiduNamingService bns;
#endif
......@@ -559,8 +562,8 @@ static void GlobalInitializeOrDieImpl() {
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after
......
......@@ -138,7 +138,7 @@ void SendRpcResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -147,7 +147,7 @@ void SendRpcResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -392,7 +392,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
......@@ -435,10 +435,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = mp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -550,7 +550,7 @@ static void SendHttpResponse(Controller *cntl,
const google::protobuf::Message *req,
const google::protobuf::Message *res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -558,7 +558,7 @@ static void SendHttpResponse(Controller *cntl,
span->set_start_send_us(butil::cpuwide_time_us());
}
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw,cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
Socket* socket = accessor.get_sending_socket();
......@@ -1166,10 +1166,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
non_service_error.release();
MethodStatus* method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
}
......@@ -1187,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -224,7 +224,7 @@ static void SendHuluResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -233,7 +233,7 @@ static void SendHuluResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -424,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -452,10 +452,10 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -60,7 +60,7 @@ SendMongoResponse::~SendMongoResponse() {
void SendMongoResponse::Run() {
std::unique_ptr<SendMongoResponse> delete_self(this);
ScopedMethodStatus method_status(status, &cntl, received_us);
ConcurrencyRemover concurrency_remover(status, &cntl, received_us);
Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket();
if (cntl.IsCloseConnection()) {
......@@ -222,7 +222,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -241,11 +241,11 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
MethodStatus* method_status = mp->status;
mongo_done->status = method_status;
if (method_status) {
if (!method_status->OnRequested()) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
method_status->max_concurrency());
ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -71,8 +71,8 @@ void NsheadClosure::Run() {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(_server->options().nshead_service->_status,
&_controller, _received_us);
MethodStatus* method_status = _server->options().nshead_service->_status;
ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......@@ -294,7 +294,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -208,7 +208,7 @@ static void SendSofaResponse(int64_t correlation_id,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
MethodStatus* method_status,
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
......@@ -217,7 +217,7 @@ static void SendSofaResponse(int64_t correlation_id,
}
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
......@@ -388,7 +388,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......@@ -408,10 +408,10 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
non_service_error.release();
method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
break;
}
}
......
......@@ -232,10 +232,9 @@ void ThriftClosure::DoRun() {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(
server->options().thrift_service ?
MethodStatus* method_status = server->options().thrift_service ?
server->options().thrift_service->_status : NULL,
&_controller, _received_us);
ConcurrencyRemover concurrency_remover(method_status, &_controller, _received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment