Commit db34f960 authored by TousakaRin's avatar TousakaRin Committed by TousakaRin

Add circuit breaker for channel

parent 74abadbe
...@@ -44,6 +44,7 @@ ChannelOptions::ChannelOptions() ...@@ -44,6 +44,7 @@ ChannelOptions::ChannelOptions()
, timeout_ms(500) , timeout_ms(500)
, backup_request_ms(-1) , backup_request_ms(-1)
, max_retry(3) , max_retry(3)
, enable_circuit_breaker(false)
, protocol(PROTOCOL_BAIDU_STD) , protocol(PROTOCOL_BAIDU_STD)
, connection_type(CONNECTION_TYPE_UNKNOWN) , connection_type(CONNECTION_TYPE_UNKNOWN)
, succeed_without_server(true) , succeed_without_server(true)
...@@ -383,6 +384,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -383,6 +384,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
cntl->_request_protocol = _options.protocol; cntl->_request_protocol = _options.protocol;
cntl->_preferred_index = _preferred_index; cntl->_preferred_index = _preferred_index;
cntl->_retry_policy = _options.retry_policy; cntl->_retry_policy = _options.retry_policy;
cntl->_enable_circuit_breaker = _options.enable_circuit_breaker;
const CallId correlation_id = cntl->call_id(); const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range( const int rc = bthread_id_lock_and_reset_range(
correlation_id, NULL, 2 + cntl->max_retry()); correlation_id, NULL, 2 + cntl->max_retry());
......
...@@ -69,6 +69,12 @@ struct ChannelOptions { ...@@ -69,6 +69,12 @@ struct ChannelOptions {
// Maximum: INT_MAX // Maximum: INT_MAX
int max_retry; int max_retry;
// When the error rate of an endpoint is too high, deactivate the node.
// Note that this deactive is GLOBAL, and the endpoint will become
// unavailable for the next period of time after the deactive.
// Default: false
bool enable_circuit_breaker;
// Serialization protocol, defined in src/brpc/options.proto // Serialization protocol, defined in src/brpc/options.proto
// NOTE: You can assign name of the protocol to this field as well, for // NOTE: You can assign name of the protocol to this field as well, for
// Example: options.protocol = "baidu_std"; // Example: options.protocol = "baidu_std";
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include <cmath>
#include <gflags/gflags.h>
#include "brpc/circuit_breaker.h"
namespace brpc {
DEFINE_int32(circuit_breaker_short_window_size, 30,
"Short window sample size.");
DEFINE_int32(circuit_breaker_long_window_size, 100,
"Long window sample size.");
DEFINE_int32(circuit_breaker_short_window_error_percent, 5,
"The maximum error rate allowed by the short window, ranging from 0-99.");
DEFINE_int32(circuit_breaker_long_window_error_percent, 3,
"The maximum error rate allowed by the long window, ranging from 0-99.");
DEFINE_int32(circuit_breaker_min_error_cost_us, 100,
"The minimum error_cost, when the ema of error cost is less than this "
"value, it will be set to zero.");
namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
// The larger the EPSILON, the larger the smoothing coefficient, which means
// that the proportion of early data is larger.
// smooth = pow(EPSILON, 1 / window_size),
// eg: when window_size = 100,
// EPSILON = 0.1, smooth = 0.9772
// EPSILON = 0.3, smooth = 0.9880
// when window_size = 30,
// EPSILON = 0.1, smooth = 0.9261
// EPSILON = 0.3, smooth = 0.9606
const double EPSILON = 0.1;
} // namepace
CircuitBreaker::EmaErrorRecorder::EmaErrorRecorder(int window_size,
int max_error_percent)
: _window_size(window_size)
, _max_error_percent(max_error_percent)
, _smooth(std::pow(EPSILON, 1.0/window_size))
, _init_completed(false)
, _sample_count(0)
, _ema_error_cost(0)
, _ema_latency(0)
, _broken(false) {
}
bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
int64_t latency) {
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
int64_t ema_latency = 0;
bool healthy = false;
if (error_code == 0) {
ema_latency = UpdateLatency(latency);
healthy = UpdateErrorCost(0, ema_latency);
} else {
ema_latency = _ema_latency.load(butil::memory_order_relaxed);
healthy = UpdateErrorCost(latency, ema_latency);
}
int sample_count = _sample_count.fetch_add(1, butil::memory_order_relaxed);
bool init_completed = _init_completed.load(butil::memory_order_acquire);
if (!init_completed && sample_count >= _window_size) {
_init_completed.store(true, butil::memory_order_release);
init_completed = true;
}
if (!init_completed) {
return true;
}
if (!healthy) {
_broken.store(true, butil::memory_order_relaxed);
}
return healthy;
}
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
while (true) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
int64_t next_ema_latency = 0;
if (0 == ema_latency) {
next_ema_latency = latency;
} else {
next_ema_latency = ema_latency * _smooth + latency * (1 - _smooth);
}
if (_ema_latency.compare_exchange_weak(ema_latency, next_ema_latency)) {
return next_ema_latency;
}
}
}
bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
int64_t ema_latency) {
//Errorous response
if (error_cost != 0) {
int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
ema_error_cost += error_cost;
int64_t max_error_cost = ema_latency * _window_size *
(_max_error_percent / 100.0) * (1.0 + EPSILON);
return ema_error_cost <= max_error_cost;
}
//Ordinary response
while (true) {
int64_t ema_error_cost =
_ema_error_cost.load(butil::memory_order_relaxed);
if (ema_error_cost == 0) {
break;
} else if (ema_error_cost < FLAGS_circuit_breaker_min_error_cost_us) {
if (_ema_error_cost.compare_exchange_weak(
ema_error_cost, 0, butil::memory_order_relaxed)) {
break;
}
} else {
int64_t next_ema_error_cost = ema_error_cost * _smooth;
if (_ema_error_cost.compare_exchange_weak(
ema_error_cost, next_ema_error_cost)) {
break;
}
}
}
return true;
}
void CircuitBreaker::EmaErrorRecorder::Reset() {
_init_completed.store(false, butil::memory_order_relaxed);
_sample_count.store(0, butil::memory_order_relaxed);
_ema_error_cost.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
_broken.store(false, butil::memory_order_relaxed);
}
CircuitBreaker::CircuitBreaker() {
auto short_recorder_adder =
std::bind(&CircuitBreaker::AddErrorRecorder,
std::placeholders::_1,
FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent);
auto long_recorder_adder =
std::bind(&CircuitBreaker::AddErrorRecorder,
std::placeholders::_1,
FLAGS_circuit_breaker_long_window_size,
FLAGS_circuit_breaker_long_window_error_percent);
_recorders.Modify(short_recorder_adder);
_recorders.Modify(long_recorder_adder);
}
void CircuitBreaker::Reset() {
auto recorder_reseter = std::bind(&CircuitBreaker::ResetEmaRecorders,
std::placeholders::_1);
_recorders.Modify(recorder_reseter);
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
butil::DoublyBufferedData<ErrRecorderList>::ScopedPtr p;
if (0 == _recorders.Read(&p)) {
for (auto& recorder : (*p)) {
if (!recorder->OnCallEnd(error_code, latency)) {
return false;
}
}
return true;
} else {
LOG(WARNING) << "EmaErrorRecorder no data";
return true;
}
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H
#include "butil/containers/doubly_buffered_data.h"
#include "butil/atomicops.h"
namespace brpc {
class CircuitBreaker {
public:
CircuitBreaker();
~CircuitBreaker() {}
// Sampling the current rpc. Returns false if the endpoint needs to
// be deactivated. Otherwise return true.
// error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call.
// Note: Once OnCallEnd() determines that a node needs to be deactivated,
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency);
// Reset circuit breaker, will erase the historical data and start
// sampling again.
// This method is thread safe, and it is inefficient, you better
// call it only when you need.
void Reset();
private:
class EmaErrorRecorder {
public:
EmaErrorRecorder(int windows_size, int max_error_percent);
bool OnCallEnd(int error_code, int64_t latency);
void Reset();
private:
int64_t UpdateLatency(int64_t latency);
bool UpdateErrorCost(int64_t latency, int64_t ema_latency);
void OnStarting(int error_code, int64_t latency);
const int _window_size;
const int _max_error_percent;
const double _smooth;
butil::atomic<bool> _init_completed;
butil::atomic<int> _sample_count;
butil::atomic<int64_t> _ema_error_cost;
butil::atomic<int64_t> _ema_latency;
butil::atomic<bool> _broken;
};
typedef std::vector<std::unique_ptr<EmaErrorRecorder>> ErrRecorderList;
static bool ResetEmaRecorders(ErrRecorderList& recorders) {
for (auto& recorder : recorders) {
recorder->Reset();
}
return true;
}
static bool AddErrorRecorder(ErrRecorderList& recorders,
int window_size, int max_error_percent){
recorders.emplace_back(
new EmaErrorRecorder(window_size, max_error_percent));
return true;
}
butil::DoublyBufferedData<ErrRecorderList> _recorders;
};
} // namespace brpc
#endif // BRPC_CIRCUIT_BREAKER_H_
...@@ -216,6 +216,7 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -216,6 +216,7 @@ void Controller::InternalReset(bool in_constructor) {
_rpc_dump_meta = NULL; _rpc_dump_meta = NULL;
_request_protocol = PROTOCOL_UNKNOWN; _request_protocol = PROTOCOL_UNKNOWN;
_max_retry = UNSET_MAGIC_NUM; _max_retry = UNSET_MAGIC_NUM;
_enable_circuit_breaker = false;
_retry_policy = NULL; _retry_policy = NULL;
_correlation_id = INVALID_BTHREAD_ID; _correlation_id = INVALID_BTHREAD_ID;
_connection_type = CONNECTION_TYPE_UNKNOWN; _connection_type = CONNECTION_TYPE_UNKNOWN;
...@@ -258,6 +259,7 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -258,6 +259,7 @@ void Controller::InternalReset(bool in_constructor) {
Controller::Call::Call(Controller::Call* rhs) Controller::Call::Call(Controller::Call* rhs)
: nretry(rhs->nretry) : nretry(rhs->nretry)
, need_feedback(rhs->need_feedback) , need_feedback(rhs->need_feedback)
, enable_circuit_breaker(rhs->enable_circuit_breaker)
, touched_by_stream_creator(rhs->touched_by_stream_creator) , touched_by_stream_creator(rhs->touched_by_stream_creator)
, peer_id(rhs->peer_id) , peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us) , begin_time_us(rhs->begin_time_us)
...@@ -266,6 +268,7 @@ Controller::Call::Call(Controller::Call* rhs) ...@@ -266,6 +268,7 @@ Controller::Call::Call(Controller::Call* rhs)
// setting all the fields to next call and _current_call.OnComplete // setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly. // will behave incorrectly.
rhs->need_feedback = false; rhs->need_feedback = false;
rhs->enable_circuit_breaker = false;
rhs->touched_by_stream_creator = false; rhs->touched_by_stream_creator = false;
rhs->peer_id = (SocketId)-1; rhs->peer_id = (SocketId)-1;
} }
...@@ -277,6 +280,7 @@ Controller::Call::~Call() { ...@@ -277,6 +280,7 @@ Controller::Call::~Call() {
void Controller::Call::Reset() { void Controller::Call::Reset() {
nretry = 0; nretry = 0;
need_feedback = false; need_feedback = false;
enable_circuit_breaker = false;
touched_by_stream_creator = false; touched_by_stream_creator = false;
peer_id = (SocketId)-1; peer_id = (SocketId)-1;
begin_time_us = 0; begin_time_us = 0;
...@@ -771,6 +775,13 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, ...@@ -771,6 +775,13 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
{ begin_time_us, peer_id, error_code, c }; { begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (enable_circuit_breaker) {
SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) {
sock->FeedbackCircuitBreaker(
error_code, butil::gettimeofday_us() - begin_time_us);
}
}
} }
void Controller::EndRPC(const CompletionInfo& info) { void Controller::EndRPC(const CompletionInfo& info) {
...@@ -963,6 +974,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -963,6 +974,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Pick a target server for sending RPC // Pick a target server for sending RPC
_current_call.need_feedback = false; _current_call.need_feedback = false;
_current_call.enable_circuit_breaker = _enable_circuit_breaker;
SocketUniquePtr tmp_sock; SocketUniquePtr tmp_sock;
if (SingleServer()) { if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction // Don't use _current_call.peer_id which is set to -1 after construction
......
...@@ -561,6 +561,7 @@ private: ...@@ -561,6 +561,7 @@ private:
int nretry; // sent in nretry-th retry. int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback. bool need_feedback; // The LB needs feedback.
bool enable_circuit_breaker; // The channel enabled circuit_breaker
bool touched_by_stream_creator; bool touched_by_stream_creator;
SocketId peer_id; // main server id SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time. int64_t begin_time_us; // sent real time.
...@@ -621,6 +622,7 @@ private: ...@@ -621,6 +622,7 @@ private:
// Some of them are copied from `Channel' which might be destroyed // Some of them are copied from `Channel' which might be destroyed
// after CallMethod. // after CallMethod.
int _max_retry; int _max_retry;
bool _enable_circuit_breaker;
const RetryPolicy* _retry_policy; const RetryPolicy* _retry_policy;
// Synchronization object for one RPC call. It remains unchanged even // Synchronization object for one RPC call. It remains unchanged even
// when retry happens. Synchronous RPC will wait on this id. // when retry happens. Synchronous RPC will wait on this id.
......
...@@ -737,6 +737,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { ...@@ -737,6 +737,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q->clear(); _pipeline_q->clear();
} }
} }
_circuit_breaker.Reset();
CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
CHECK_EQ(0, _unwritten_bytes.load(butil::memory_order_relaxed)); CHECK_EQ(0, _unwritten_bytes.load(butil::memory_order_relaxed));
CHECK(!_overcrowded); CHECK(!_overcrowded);
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "brpc/options.pb.h" // ConnectionType #include "brpc/options.pb.h" // ConnectionType
#include "brpc/socket_id.h" // SocketId #include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr #include "brpc/socket_message.h" // SocketMessagePtr
#include "brpc/circuit_breaker.h" // CircuitBreaker
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -315,6 +316,14 @@ public: ...@@ -315,6 +316,14 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4))); __attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id); static int SetFailed(SocketId id);
void FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!_circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR)
<< "Socket[" << *this << "] deactivted by circuit breaker";
SetFailed();
}
}
bool Failed() const; bool Failed() const;
bool DidReleaseAdditionalRereference() const bool DidReleaseAdditionalRereference() const
...@@ -759,6 +768,8 @@ private: ...@@ -759,6 +768,8 @@ private:
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set; std::set<StreamId> *_stream_set;
CircuitBreaker _circuit_breaker;
}; };
} // namespace brpc } // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment