Commit 34594d97 authored by TousakaRin's avatar TousakaRin

fix comments

parent 268fa1dc
......@@ -39,7 +39,7 @@ ConnectionType StringToConnectionType(const butil::StringPiece& type,
}
LOG_IF(ERROR, print_log_on_unknown && !type.empty())
<< "Unknown connection_type `" << type
<< "`, supported types: single pooled short";
<< "', supported types: single pooled short";
return CONNECTION_TYPE_UNKNOWN;
}
......
......@@ -40,6 +40,8 @@ public:
virtual int MaxConcurrency() const = 0;
virtual int& MaxConcurrency() = 0;
virtual int CurrentMaxConcurrency() const = 0;
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
......
......@@ -16,8 +16,9 @@
#include <limits>
#include "butil/macros.h"
#include "brpc/details/method_status.h"
#include "brpc/controller.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/method_status.h"
namespace brpc {
......@@ -141,6 +142,7 @@ ScopedMethodStatus::~ScopedMethodStatus() {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _start_parse_us);
_status = NULL;
}
ServerPrivateAccessor(_server).RemoveConcurrency(_c);
}
} // namespace brpc
......@@ -26,6 +26,7 @@
namespace brpc {
class Controller;
class Server;
// Record accessing stats of a method.
class MethodStatus : public Describable {
public:
......@@ -52,6 +53,10 @@ public:
// Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const;
int current_max_concurrency() const {
return _cl->CurrentMaxConcurrency();
}
int max_concurrency() const {
return const_cast<const ConcurrencyLimiter*>(_cl)->MaxConcurrency();
}
......@@ -68,7 +73,6 @@ public:
private:
friend class ScopedMethodStatus;
DISALLOW_COPY_AND_ASSIGN(MethodStatus);
void OnError();
ConcurrencyLimiter* _cl;
bvar::Adder<int64_t> _nerror;
......@@ -79,53 +83,45 @@ friend class ScopedMethodStatus;
butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
};
// If release() is not called before destruction of this object,
// an error will be counted.
class ScopedMethodStatus {
public:
ScopedMethodStatus(MethodStatus* status, Controller* c,
ScopedMethodStatus(MethodStatus* status,
const Server* server,
Controller* c,
int64_t start_parse_us)
: _status(status)
, _server(server)
, _c(c)
, _start_parse_us(start_parse_us) {}
~ScopedMethodStatus();
MethodStatus* release() {
MethodStatus* tmp = _status;
_status = NULL;
return tmp;
}
operator MethodStatus* () const { return _status; }
private:
DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus);
MethodStatus* _status;
const Server* _server;
Controller* _c;
uint64_t _start_parse_us;
};
inline bool MethodStatus::OnRequested() {
_nprocessing.fetch_add(1, butil::memory_order_relaxed);
bool should_refuse = !_cl->OnRequested();
if (should_refuse) {
_nrefused_bvar << 1;
if (_cl->OnRequested()) {
return true;
}
return !should_refuse;
_nrefused_bvar << 1;
return false;
}
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
} else {
OnError();
_nerror << 1;
}
_cl->OnResponded(error_code, latency);
}
inline void MethodStatus::OnError() {
_nerror << 1;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
}
} // namespace brpc
......
......@@ -118,20 +118,6 @@ private:
const Server* _server;
};
class ScopedRemoveConcurrency {
public:
ScopedRemoveConcurrency(const Server* server, const Controller* c)
: _server(server), _cntl(c) {}
~ScopedRemoveConcurrency() {
ServerPrivateAccessor(_server).RemoveConcurrency(_cntl);
}
private:
DISALLOW_COPY_AND_ASSIGN(ScopedRemoveConcurrency);
const Server* _server;
const Controller* _cntl;
};
} // namespace brpc
......
......@@ -146,11 +146,11 @@ void SendRpcResponse(int64_t correlation_id,
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, server,
cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
StreamId response_stream_id = accessor.response_stream();
......@@ -264,10 +264,6 @@ void SendRpcResponse(int64_t correlation_id,
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
struct CallMethodInBackupThreadArgs {
......@@ -443,7 +439,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
const_cast<const MethodStatus*>(method_status)->max_concurrency());
method_status->current_max_concurrency());
break;
}
}
......@@ -515,11 +511,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(), cntl.release(),
req.release(), res.release(), server,
<<<<<<< HEAD
method_status, msg->received_us());
=======
method_status, start_parse_us);
>>>>>>> auto max_concurrency limiter
}
bool VerifyRpcRequest(const InputMessageBase* msg_base) {
......
......@@ -32,6 +32,10 @@ void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
int ConstantConcurrencyLimiter::CurrentMaxConcurrency() const {
return _max_concurrency;
}
int ConstantConcurrencyLimiter::MaxConcurrency() const {
return _max_concurrency;
}
......
......@@ -32,8 +32,9 @@ public:
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
virtual int MaxConcurrency() const override;
virtual int& MaxConcurrency() override;
int CurrentMaxConcurrency() const override;
int MaxConcurrency() const override;
int& MaxConcurrency() override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
......
......@@ -17,8 +17,9 @@
#ifndef BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
#include "brpc/concurrency_limiter.h"
#include "bvar/bvar.h"
#include "butil/containers/bounded_queue.h"
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
......@@ -29,6 +30,7 @@ public:
~GradientConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int CurrentMaxConcurrency() const override;
int MaxConcurrency() const override;
// For compatibility with the MaxConcurrencyOf() interface. When using
......@@ -39,6 +41,7 @@ public:
// effect.
int& MaxConcurrency() override;
int Expose(const butil::StringPiece& prefix) override;
GradientConcurrencyLimiter* New() const override;
void Destroy() override;
......@@ -60,11 +63,13 @@ private:
};
struct WindowSnap {
WindowSnap(int64_t latency_us, int32_t concurrency)
WindowSnap(int64_t latency_us, int32_t concurrency, int32_t succ_req)
: avg_latency_us(latency_us)
, actuall_concurrency(concurrency) {}
, actual_concurrency(concurrency)
, total_succ_req(succ_req) {}
int64_t avg_latency_us;
int32_t actuall_concurrency;
int32_t actual_concurrency;
int32_t total_succ_req;
};
void AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
......@@ -74,12 +79,13 @@ private:
void ResetSampleWindow(int64_t sampling_time_us);
SampleWindow _sw;
std::vector<WindowSnap> _ws_queue;
butil::BoundedQueue<WindowSnap> _ws_queue;
uint32_t _ws_index;
int32_t _unused_max_concurrency;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _total_succ_req;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _max_concurrency;
butil::atomic<int32_t> BAIDU_CACHELINE_ALIGNMENT _current_concurrency;
};
......
......@@ -557,12 +557,12 @@ static void SendHttpResponse(Controller *cntl,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
ScopedMethodStatus method_status(method_status_raw, server,
cntl, received_us);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
Socket* socket = accessor.get_sending_socket();
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) {
socket->SetFailed();
......@@ -727,10 +727,6 @@ static void SendHttpResponse(Controller *cntl,
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
inline void SendHttpResponse(Controller *cntl, const Server* svr,
......@@ -1172,19 +1168,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
MethodStatus* method_status = sp->status;
if (method_status) {
if (!method_status->OnRequested()) {
<<<<<<< HEAD
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
=======
cntl->SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status);
>>>>>>> auto max_concurrency limiter
}
}
......@@ -1200,16 +1187,9 @@ void ProcessHttpRequest(InputMessageBase *msg) {
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (!server_accessor.AddConcurrency(cntl.get())) {
<<<<<<< HEAD
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
static_cast<int>(server->options().max_concurrency));
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
=======
cntl->SetFailed(
ELIMIT, "Reached server's max_concurrency=%d",
static_cast<int>((server->options().max_concurrency)));
return SendHttpResponse(cntl.release(), server, method_status);
>>>>>>> auto max_concurrency limiter
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
......
......@@ -231,12 +231,12 @@ static void SendHuluResponse(int64_t correlation_id,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
ScopedMethodStatus method_status(method_status_raw, server,
cntl, received_us);
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) {
sock->SetFailed();
......@@ -318,10 +318,6 @@ static void SendHuluResponse(int64_t correlation_id,
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
// Defined in baidu_rpc_protocol.cpp
......@@ -460,8 +456,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
method_status->current_max_concurrency());
break;
}
}
......
......@@ -60,7 +60,8 @@ SendMongoResponse::~SendMongoResponse() {
void SendMongoResponse::Run() {
std::unique_ptr<SendMongoResponse> delete_self(this);
ScopedMethodStatus method_status(status, &cntl, butil::cpuwide_time_us());
ScopedMethodStatus method_status(status, server,
&cntl, received_us);
Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket();
if (cntl.IsCloseConnection()) {
......@@ -102,10 +103,6 @@ void SendMongoResponse::Run() {
return;
}
}
if (method_status) {
method_status.release()->OnResponded(
cntl.ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
ParseResult ParseMongoMessage(butil::IOBuf* source,
......@@ -249,8 +246,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
mongo_done->cntl.SetFailed(
ELIMIT, "Reached %s's max_concurrency=%d",
mp->method->full_name().c_str(),
const_cast<const MethodStatus*>(
method_status)->max_concurrency());
method_status->current_max_concurrency());
break;
}
}
......
......@@ -64,7 +64,6 @@ public:
void NsheadClosure::Run() {
// Recycle itself after `Run'
std::unique_ptr<NsheadClosure, DeleteNsheadClosure> recycle_ctx(this);
ScopedRemoveConcurrency remove_concurrency_dummy(_server, &_controller);
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
......@@ -73,7 +72,8 @@ void NsheadClosure::Run() {
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(_server->options().nshead_service->_status,
&_controller, butil::cpuwide_time_us());
_server, &_controller,
_received_us);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......@@ -124,10 +124,6 @@ void NsheadClosure::Run() {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
_controller.ErrorCode(), butil::cpuwide_time_us() - _received_us);
}
}
void NsheadClosure::SetMethodName(const std::string& full_method_name) {
......
......@@ -215,12 +215,12 @@ static void SendSofaResponse(int64_t correlation_id,
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(method_status_raw, cntl, start_parse_us);
ScopedMethodStatus method_status(method_status_raw, server,
cntl, received_us);
Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) {
sock->SetFailed();
......@@ -294,10 +294,6 @@ static void SendSofaResponse(int64_t correlation_id,
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
cntl->ErrorCode(), butil::cpuwide_time_us() - received_us);
}
}
// Defined in baidu_rpc_protocol.cpp
......@@ -416,8 +412,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
const_cast<MethodStatus*>(
method_status)->max_concurrency());
method_status->current_max_concurrency());
break;
}
}
......
......@@ -232,9 +232,8 @@ void ThriftClosure::DoRun() {
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(server->options().thrift_service ?
server->options().thrift_service->_status : NULL);
ScopedMethodStatus method_status(_server->options().thrift_service->_status,
_server, &_controller, cpuwide_start_us());
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
......@@ -526,7 +525,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
static_cast<int>(server->options().max_concurrency));
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment