Commit ffca94c0 authored by TousakaRin's avatar TousakaRin Committed by TousakaRin

1.Simplified the design of CircuitBreaker. 2. modified the comments

parent 499340aa
......@@ -69,9 +69,9 @@ struct ChannelOptions {
// Maximum: INT_MAX
int max_retry;
// When the error rate of an endpoint is too high, deactivate the node.
// Note that this deactive is GLOBAL, and the endpoint will become
// unavailable for the next period of time after the deactive.
// When the error rate of a server node is too high, isolate the node.
// Note that this isolation is GLOBAL, the node will become unavailable
// for all channels running in this process during the isolation.
// Default: false
bool enable_circuit_breaker;
......
......@@ -90,6 +90,14 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
return healthy;
}
void CircuitBreaker::EmaErrorRecorder::Reset() {
_init_completed.store(false, butil::memory_order_relaxed);
_sample_count.store(0, butil::memory_order_relaxed);
_ema_error_cost.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
_broken.store(false, butil::memory_order_relaxed);
}
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
while (true) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
......@@ -139,48 +147,21 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
return true;
}
void CircuitBreaker::EmaErrorRecorder::Reset() {
_init_completed.store(false, butil::memory_order_relaxed);
_sample_count.store(0, butil::memory_order_relaxed);
_ema_error_cost.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
_broken.store(false, butil::memory_order_relaxed);
CircuitBreaker::CircuitBreaker()
: _long_window(FLAGS_circuit_breaker_long_window_size,
FLAGS_circuit_breaker_long_window_error_percent)
, _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent) {
}
CircuitBreaker::CircuitBreaker() {
auto short_recorder_adder =
std::bind(&CircuitBreaker::AddErrorRecorder,
std::placeholders::_1,
FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent);
auto long_recorder_adder =
std::bind(&CircuitBreaker::AddErrorRecorder,
std::placeholders::_1,
FLAGS_circuit_breaker_long_window_size,
FLAGS_circuit_breaker_long_window_error_percent);
_recorders.Modify(short_recorder_adder);
_recorders.Modify(long_recorder_adder);
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
return _long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency);
}
void CircuitBreaker::Reset() {
auto recorder_reseter = std::bind(&CircuitBreaker::ResetEmaRecorders,
std::placeholders::_1);
_recorders.Modify(recorder_reseter);
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
butil::DoublyBufferedData<ErrRecorderList>::ScopedPtr p;
if (0 == _recorders.Read(&p)) {
for (auto& recorder : (*p)) {
if (!recorder->OnCallEnd(error_code, latency)) {
return false;
}
}
return true;
} else {
LOG(WARNING) << "EmaErrorRecorder no data";
return true;
}
_long_window.Reset();
_short_window.Reset();
}
} // namespace brpc
......@@ -28,21 +28,19 @@ public:
~CircuitBreaker() {}
// Sampling the current rpc. Returns false if the endpoint needs to
// be deactivated. Otherwise return true.
// Sampling the current rpc. Returns false if a node needs to
// be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call.
// Note: Once OnCallEnd() determines that a node needs to be deactivated,
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency);
// Reset circuit breaker, will erase the historical data and start
// sampling again.
// This method is thread safe, and it is inefficient, you better
// call it only when you need.
// Reset CircuitBreaker and clear history data. will erase the historical
// data and start sampling again. Before you call this method, you need to
// ensure that no one else is calling OnCallEnd.
void Reset();
private:
class EmaErrorRecorder {
public:
......@@ -53,7 +51,6 @@ private:
private:
int64_t UpdateLatency(int64_t latency);
bool UpdateErrorCost(int64_t latency, int64_t ema_latency);
void OnStarting(int error_code, int64_t latency);
const int _window_size;
const int _max_error_percent;
......@@ -64,23 +61,9 @@ private:
butil::atomic<int64_t> _ema_latency;
butil::atomic<bool> _broken;
};
typedef std::vector<std::unique_ptr<EmaErrorRecorder>> ErrRecorderList;
static bool ResetEmaRecorders(ErrRecorderList& recorders) {
for (auto& recorder : recorders) {
recorder->Reset();
}
return true;
}
static bool AddErrorRecorder(ErrRecorderList& recorders,
int window_size, int max_error_percent){
recorders.emplace_back(
new EmaErrorRecorder(window_size, max_error_percent));
return true;
}
butil::DoublyBufferedData<ErrRecorderList> _recorders;
EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window;
};
} // namespace brpc
......
......@@ -778,8 +778,8 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
if (enable_circuit_breaker) {
SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) {
sock->FeedbackCircuitBreaker(
error_code, butil::gettimeofday_us() - begin_time_us);
// sock->GetSharedPart()->circuit_breaker.OnCallEnd(
// error_code, butil::gettimeofday_us() - begin_time_us);
}
}
}
......
......@@ -42,6 +42,7 @@
#include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#include "brpc/circuit_breaker.h" // CircuitBreaker
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
......@@ -177,6 +178,8 @@ public:
// For computing stats.
ExtendedSocketStat* extended_stat;
CircuitBreaker circuit_breaker;
explicit SharedPart(SocketId creator_socket_id);
~SharedPart();
......@@ -737,7 +740,12 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q->clear();
}
}
_circuit_breaker.Reset();
SharedPart* sp = GetSharedPart();
if (sp) {
sp->circuit_breaker.Reset();
}
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
CHECK_EQ(0, _unwritten_bytes.load(butil::memory_order_relaxed));
CHECK(!_overcrowded);
......@@ -874,6 +882,14 @@ int Socket::SetFailed() {
return SetFailed(EFAILEDSOCKET, NULL);
}
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR)
<< "Socket[" << *this << "] deactivted by circuit breaker";
SetFailed();
}
}
int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
const int64_t last_active_us = last_active_time_us();
if (butil::cpuwide_time_us() - last_active_us <= idle_seconds * 1000000L) {
......
......@@ -316,13 +316,7 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id);
void FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!_circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR)
<< "Socket[" << *this << "] deactivted by circuit breaker";
SetFailed();
}
}
void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
bool Failed() const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment