Commit f9d80bdc authored by TousakaRin's avatar TousakaRin

Replace _enable_circuit_breaker with FALGS_ENABLED_CIRCUIT_BREAKER

parent 3706336b
...@@ -384,7 +384,9 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -384,7 +384,9 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
cntl->_request_protocol = _options.protocol; cntl->_request_protocol = _options.protocol;
cntl->_preferred_index = _preferred_index; cntl->_preferred_index = _preferred_index;
cntl->_retry_policy = _options.retry_policy; cntl->_retry_policy = _options.retry_policy;
cntl->_enable_circuit_breaker = _options.enable_circuit_breaker; if (_options.enable_circuit_breaker) {
cntl->add_flag(Controller::FLAGS_ENABLED_CIRCUIT_BREAKER);
}
const CallId correlation_id = cntl->call_id(); const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range( const int rc = bthread_id_lock_and_reset_range(
correlation_id, NULL, 2 + cntl->max_retry()); correlation_id, NULL, 2 + cntl->max_retry());
......
...@@ -216,7 +216,6 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -216,7 +216,6 @@ void Controller::InternalReset(bool in_constructor) {
_rpc_dump_meta = NULL; _rpc_dump_meta = NULL;
_request_protocol = PROTOCOL_UNKNOWN; _request_protocol = PROTOCOL_UNKNOWN;
_max_retry = UNSET_MAGIC_NUM; _max_retry = UNSET_MAGIC_NUM;
_enable_circuit_breaker = false;
_retry_policy = NULL; _retry_policy = NULL;
_correlation_id = INVALID_BTHREAD_ID; _correlation_id = INVALID_BTHREAD_ID;
_connection_type = CONNECTION_TYPE_UNKNOWN; _connection_type = CONNECTION_TYPE_UNKNOWN;
...@@ -259,7 +258,6 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -259,7 +258,6 @@ void Controller::InternalReset(bool in_constructor) {
Controller::Call::Call(Controller::Call* rhs) Controller::Call::Call(Controller::Call* rhs)
: nretry(rhs->nretry) : nretry(rhs->nretry)
, need_feedback(rhs->need_feedback) , need_feedback(rhs->need_feedback)
, enable_circuit_breaker(rhs->enable_circuit_breaker)
, touched_by_stream_creator(rhs->touched_by_stream_creator) , touched_by_stream_creator(rhs->touched_by_stream_creator)
, peer_id(rhs->peer_id) , peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us) , begin_time_us(rhs->begin_time_us)
...@@ -268,7 +266,6 @@ Controller::Call::Call(Controller::Call* rhs) ...@@ -268,7 +266,6 @@ Controller::Call::Call(Controller::Call* rhs)
// setting all the fields to next call and _current_call.OnComplete // setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly. // will behave incorrectly.
rhs->need_feedback = false; rhs->need_feedback = false;
rhs->enable_circuit_breaker = false;
rhs->touched_by_stream_creator = false; rhs->touched_by_stream_creator = false;
rhs->peer_id = (SocketId)-1; rhs->peer_id = (SocketId)-1;
} }
...@@ -280,7 +277,6 @@ Controller::Call::~Call() { ...@@ -280,7 +277,6 @@ Controller::Call::~Call() {
void Controller::Call::Reset() { void Controller::Call::Reset() {
nretry = 0; nretry = 0;
need_feedback = false; need_feedback = false;
enable_circuit_breaker = false;
touched_by_stream_creator = false; touched_by_stream_creator = false;
peer_id = (SocketId)-1; peer_id = (SocketId)-1;
begin_time_us = 0; begin_time_us = 0;
...@@ -767,6 +763,10 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, ...@@ -767,6 +763,10 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
c->stream_creator()->CleanupSocketForStream( c->stream_creator()->CleanupSocketForStream(
sending_sock.get(), c, error_code); sending_sock.get(), c, error_code);
} }
if (enable_circuit_breaker) {
sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
// Release the `Socket' we used to send/receive data // Release the `Socket' we used to send/receive data
sending_sock.reset(NULL); sending_sock.reset(NULL);
...@@ -775,13 +775,6 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, ...@@ -775,13 +775,6 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
{ begin_time_us, peer_id, error_code, c }; { begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (enable_circuit_breaker) {
SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) {
sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
}
} }
void Controller::EndRPC(const CompletionInfo& info) { void Controller::EndRPC(const CompletionInfo& info) {
...@@ -974,7 +967,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -974,7 +967,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Pick a target server for sending RPC // Pick a target server for sending RPC
_current_call.need_feedback = false; _current_call.need_feedback = false;
_current_call.enable_circuit_breaker = _enable_circuit_breaker; _current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock; SocketUniquePtr tmp_sock;
if (SingleServer()) { if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction // Don't use _current_call.peer_id which is set to -1 after construction
......
...@@ -130,6 +130,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); ...@@ -130,6 +130,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13); static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15); static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16); static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
public: public:
Controller(); Controller();
...@@ -601,6 +602,16 @@ private: ...@@ -601,6 +602,16 @@ private:
void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); } void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); } bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
<<<<<<< HEAD
=======
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
bool has_enabled_circuit_breaker() const {
return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER);
}
>>>>>>> Replace _enable_circuit_breaker with FALGS_ENABLED_CIRCUIT_BREAKER
private: private:
// NOTE: align and group fields to make Controller as compact as possible. // NOTE: align and group fields to make Controller as compact as possible.
...@@ -622,7 +633,6 @@ private: ...@@ -622,7 +633,6 @@ private:
// Some of them are copied from `Channel' which might be destroyed // Some of them are copied from `Channel' which might be destroyed
// after CallMethod. // after CallMethod.
int _max_retry; int _max_retry;
bool _enable_circuit_breaker;
const RetryPolicy* _retry_policy; const RetryPolicy* _retry_policy;
// Synchronization object for one RPC call. It remains unchanged even // Synchronization object for one RPC call. It remains unchanged even
// when retry happens. Synchronous RPC will wait on this id. // when retry happens. Synchronous RPC will wait on this id.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment