Commit b67ef038 authored by TousakaRin's avatar TousakaRin

Exposing the status of CircuitBreaker to the builtIn page

parent 3afac97f
...@@ -122,6 +122,9 @@ void ConnectionsService::PrintConnections( ...@@ -122,6 +122,9 @@ void ConnectionsService::PrintConnections(
os << "<th>SSL</th>" os << "<th>SSL</th>"
"<th>Protocol</th>" "<th>Protocol</th>"
"<th>fd</th>" "<th>fd</th>"
"<th>error_count</th>"
"<th>health_index</th>"
"<th>broken_times</th>"
"<th>InBytes/s</th>" "<th>InBytes/s</th>"
"<th>In/s</th>" "<th>In/s</th>"
"<th>InBytes/m</th>" "<th>InBytes/m</th>"
...@@ -139,6 +142,7 @@ void ConnectionsService::PrintConnections( ...@@ -139,6 +142,7 @@ void ConnectionsService::PrintConnections(
os << "Local|"; os << "Local|";
} }
os << "SSL|Protocol |fd |" os << "SSL|Protocol |fd |"
"error_count|health_index|broken_times|"
"InBytes/s|In/s |InBytes/m |In/m |" "InBytes/s|In/s |InBytes/m |In/m |"
"OutBytes/s|Out/s |OutBytes/m|Out/m |" "OutBytes/s|Out/s |OutBytes/m|Out/m |"
"Rtt/Var(ms)|SocketId\n"; "Rtt/Var(ms)|SocketId\n";
...@@ -177,6 +181,9 @@ void ConnectionsService::PrintConnections( ...@@ -177,6 +181,9 @@ void ConnectionsService::PrintConnections(
os << min_width("-", 3) << bar os << min_width("-", 3) << bar
<< min_width("-", 12) << bar << min_width("-", 12) << bar
<< min_width("-", 5) << bar << min_width("-", 5) << bar
<< min_width(ptr->error_count(), 11) << bar
<< min_width("0", 12) << bar
<< min_width(ptr->broken_times(), 12) << bar
<< min_width("-", 9) << bar << min_width("-", 9) << bar
<< min_width("-", 6) << bar << min_width("-", 6) << bar
<< min_width("-", 10) << bar << min_width("-", 10) << bar
...@@ -288,7 +295,10 @@ void ConnectionsService::PrintConnections( ...@@ -288,7 +295,10 @@ void ConnectionsService::PrintConnections(
} else { } else {
os << min_width("-", 5) << bar; os << min_width("-", 5) << bar;
} }
os << min_width(stat.in_size_s, 9) << bar os << min_width(ptr->error_count(), 11) << bar
<< min_width(ptr->health_index_in_percent(), 12) << bar
<< min_width(ptr->broken_times(), 12) << bar
<< min_width(stat.in_size_s, 9) << bar
<< min_width(stat.in_num_messages_s, 6) << bar << min_width(stat.in_num_messages_s, 6) << bar
<< min_width(stat.in_size_m, 10) << bar << min_width(stat.in_size_m, 10) << bar
<< min_width(stat.in_num_messages_m, 8) << bar << min_width(stat.in_num_messages_m, 8) << bar
......
...@@ -89,6 +89,20 @@ void CircuitBreaker::EmaErrorRecorder::Reset() { ...@@ -89,6 +89,20 @@ void CircuitBreaker::EmaErrorRecorder::Reset() {
_ema_latency.store(0, butil::memory_order_relaxed); _ema_latency.store(0, butil::memory_order_relaxed);
} }
int64_t CircuitBreaker::EmaErrorRecorder::max_error_cost() const {
const int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
return ema_latency * _window_size * (_max_error_percent / 100.0) * (1.0 + EPSILON);
}
int CircuitBreaker::EmaErrorRecorder::health_index_in_percent() const {
const int64_t current_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
const int64_t error_cost_threshold = max_error_cost();
if (error_cost_threshold == 0) {
return current_error_cost == 0 ? 100 : 0;
}
return 100 - std::min<int>(100 * current_error_cost / error_cost_threshold, 100);
}
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) { int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed); int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
do { do {
...@@ -115,9 +129,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, ...@@ -115,9 +129,7 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
int64_t ema_error_cost = int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed); _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
ema_error_cost += error_cost; ema_error_cost += error_cost;
int64_t max_error_cost = ema_latency * _window_size * return ema_error_cost <= max_error_cost();
(_max_error_percent / 100.0) * (1.0 + EPSILON);
return ema_error_cost <= max_error_cost;
} }
//Ordinary response //Ordinary response
...@@ -147,45 +159,43 @@ CircuitBreaker::CircuitBreaker() ...@@ -147,45 +159,43 @@ CircuitBreaker::CircuitBreaker()
, _short_window(FLAGS_circuit_breaker_short_window_size, , _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent) FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms()) , _last_reset_time_ms(butil::cpuwide_time_ms())
, _broken(false) , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) { , _broken_times(0) {
} }
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
if (_broken.load(butil::memory_order_relaxed)) { return _long_window.OnCallEnd(error_code, latency) &&
return false; _short_window.OnCallEnd(error_code, latency);
}
if (_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
}
if (!_broken.exchange(true, butil::memory_order_acquire)) {
UpdateIsolationDuration();
}
return false;
} }
void CircuitBreaker::Reset() { void CircuitBreaker::Reset() {
_long_window.Reset(); _long_window.Reset();
_short_window.Reset(); _short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms(); _last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release); }
void CircuitBreaker::MarkAsBroken() {
++_broken_times;
UpdateIsolationDuration();
}
int CircuitBreaker::health_index_in_percent() const {
return std::min(_long_window.health_index_in_percent(),
_short_window.health_index_in_percent());
} }
void CircuitBreaker::UpdateIsolationDuration() { void CircuitBreaker::UpdateIsolationDuration() {
int64_t now_time_ms = butil::cpuwide_time_ms(); int64_t now_time_ms = butil::cpuwide_time_ms();
int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed);
const int max_isolation_duration_ms = const int max_isolation_duration_ms =
FLAGS_circuit_breaker_max_isolation_duration_ms; FLAGS_circuit_breaker_max_isolation_duration_ms;
const int min_isolation_duration_ms = const int min_isolation_duration_ms =
FLAGS_circuit_breaker_min_isolation_duration_ms; FLAGS_circuit_breaker_min_isolation_duration_ms;
if (now_time_ms - _last_reset_time_ms < max_isolation_duration_ms) { if (now_time_ms - _last_reset_time_ms < max_isolation_duration_ms) {
isolation_duration_ms = _isolation_duration_ms =
std::min(isolation_duration_ms * 2, max_isolation_duration_ms); std::min(_isolation_duration_ms * 2, max_isolation_duration_ms);
} else { } else {
isolation_duration_ms = min_isolation_duration_ms; _isolation_duration_ms = min_isolation_duration_ms;
} }
_isolation_duration_ms.store(isolation_duration_ms, butil::memory_order_relaxed);
} }
} // namespace brpc } // namespace brpc
...@@ -31,9 +31,6 @@ public: ...@@ -31,9 +31,6 @@ public:
// be isolated. Otherwise return true. // be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success. // error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call. // latency: Time cost of this call.
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency); bool OnCallEnd(int error_code, int64_t latency);
// Reset CircuitBreaker and clear history data. will erase the historical // Reset CircuitBreaker and clear history data. will erase the historical
...@@ -41,10 +38,19 @@ public: ...@@ -41,10 +38,19 @@ public:
// ensure that no one else is calling OnCallEnd. // ensure that no one else is calling OnCallEnd.
void Reset(); void Reset();
// Not thread-safe
void MarkAsBroken();
int health_index_in_percent() const;
int broken_times() const {
return _broken_times;
}
// The duration that should be isolated when the socket fails in milliseconds. // The duration that should be isolated when the socket fails in milliseconds.
// The higher the frequency of socket errors, the longer the duration. // The higher the frequency of socket errors, the longer the duration.
int isolation_duration_ms() { int isolation_duration_ms() const {
return _isolation_duration_ms.load(butil::memory_order_relaxed); return _isolation_duration_ms;
} }
private: private:
...@@ -55,6 +61,9 @@ private: ...@@ -55,6 +61,9 @@ private:
EmaErrorRecorder(int windows_size, int max_error_percent); EmaErrorRecorder(int windows_size, int max_error_percent);
bool OnCallEnd(int error_code, int64_t latency); bool OnCallEnd(int error_code, int64_t latency);
void Reset(); void Reset();
int64_t max_error_cost() const;
int health_index_in_percent() const;
private: private:
int64_t UpdateLatency(int64_t latency); int64_t UpdateLatency(int64_t latency);
...@@ -72,8 +81,8 @@ private: ...@@ -72,8 +81,8 @@ private:
EmaErrorRecorder _long_window; EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window; EmaErrorRecorder _short_window;
int64_t _last_reset_time_ms; int64_t _last_reset_time_ms;
butil::atomic<bool> _broken; int _isolation_duration_ms;
butil::atomic<int> _isolation_duration_ms; int _broken_times;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -700,6 +700,9 @@ void Controller::Call::OnComplete( ...@@ -700,6 +700,9 @@ void Controller::Call::OnComplete(
sending_sock->FeedbackCircuitBreaker(error_code, sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us); butil::gettimeofday_us() - begin_time_us);
} }
if (error_code != 0 && sending_sock) {
sending_sock->AddErrorCount();
}
switch (c->connection_type()) { switch (c->connection_type()) {
case CONNECTION_TYPE_UNKNOWN: case CONNECTION_TYPE_UNKNOWN:
...@@ -765,7 +768,7 @@ void Controller::Call::OnComplete( ...@@ -765,7 +768,7 @@ void Controller::Call::OnComplete(
sock->SetLogOff(); sock->SetLogOff();
} }
} }
if (need_feedback) { if (need_feedback) {
const LoadBalancer::CallInfo info = const LoadBalancer::CallInfo info =
{ begin_time_us, peer_id, error_code, c }; { begin_time_us, peer_id, error_code, c };
......
...@@ -178,6 +178,8 @@ public: ...@@ -178,6 +178,8 @@ public:
CircuitBreaker circuit_breaker; CircuitBreaker circuit_breaker;
butil::atomic<uint64_t> error_count;
explicit SharedPart(SocketId creator_socket_id); explicit SharedPart(SocketId creator_socket_id);
~SharedPart(); ~SharedPart();
...@@ -193,7 +195,8 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2) ...@@ -193,7 +195,8 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
, in_num_messages(0) , in_num_messages(0)
, out_size(0) , out_size(0)
, out_num_messages(0) , out_num_messages(0)
, extended_stat(NULL) { , extended_stat(NULL)
, error_count(0) {
} }
Socket::SharedPart::~SharedPart() { Socket::SharedPart::~SharedPart() {
...@@ -802,6 +805,34 @@ int Socket::ReleaseAdditionalReference() { ...@@ -802,6 +805,34 @@ int Socket::ReleaseAdditionalReference() {
return -1; return -1;
} }
void Socket::AddErrorCount() {
GetOrNewSharedPart()->error_count.fetch_add(1, butil::memory_order_relaxed);
}
int Socket::broken_times() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->circuit_breaker.broken_times();
}
return 0;
}
uint64_t Socket::error_count() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->error_count.load(butil::memory_order_relaxed);
}
return 0;
}
int Socket::health_index_in_percent() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->circuit_breaker.health_index_in_percent();
}
return 100;
}
int Socket::SetFailed(int error_code, const char* error_fmt, ...) { int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
if (error_code == 0) { if (error_code == 0) {
CHECK(false) << "error_code is 0"; CHECK(false) << "error_code is 0";
...@@ -876,8 +907,10 @@ int Socket::SetFailed() { ...@@ -876,8 +907,10 @@ int Socket::SetFailed() {
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) { void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) { if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker"; if (SetFailed(main_socket_id()) == 0) {
SetFailed(main_socket_id()); SetFailed(main_socket_id());
LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker";
}
} }
} }
......
...@@ -317,6 +317,14 @@ public: ...@@ -317,6 +317,14 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4))); __attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id); static int SetFailed(SocketId id);
void AddErrorCount();
uint64_t error_count() const;
int broken_times() const;
int health_index_in_percent() const;
void FeedbackCircuitBreaker(int error_code, int64_t latency_us); void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
bool Failed() const; bool Failed() const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment