Unverified Commit 2685cd8c authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #531 from TousakaRin/circuit_breaker

Exposing the status of CircuitBreaker to the builtIn page
parents 11e9156e 1737f164
...@@ -108,7 +108,7 @@ static std::string BriefName(const std::string& cname) { ...@@ -108,7 +108,7 @@ static std::string BriefName(const std::string& cname) {
void ConnectionsService::PrintConnections( void ConnectionsService::PrintConnections(
std::ostream& os, const std::vector<SocketId>& conns, std::ostream& os, const std::vector<SocketId>& conns,
bool use_html, const Server* server, bool need_local) const { bool use_html, const Server* server, bool is_channel_conn) const {
if (conns.empty()) { if (conns.empty()) {
return; return;
} }
...@@ -116,8 +116,10 @@ void ConnectionsService::PrintConnections( ...@@ -116,8 +116,10 @@ void ConnectionsService::PrintConnections(
os << "<table class=\"gridtable sortable\" border=\"1\"><tr>" os << "<table class=\"gridtable sortable\" border=\"1\"><tr>"
"<th>CreatedTime</th>" "<th>CreatedTime</th>"
"<th>RemoteSide</th>"; "<th>RemoteSide</th>";
if (need_local) { if (is_channel_conn) {
os << "<th>Local</th>"; os << "<th>Local</th>"
"<th>RecentErr</th>"
"<th>nbreak</th>";
} }
os << "<th>SSL</th>" os << "<th>SSL</th>"
"<th>Protocol</th>" "<th>Protocol</th>"
...@@ -135,8 +137,8 @@ void ConnectionsService::PrintConnections( ...@@ -135,8 +137,8 @@ void ConnectionsService::PrintConnections(
"</tr>\n"; "</tr>\n";
} else { } else {
os << "CreatedTime |RemoteSide |"; os << "CreatedTime |RemoteSide |";
if (need_local) { if (is_channel_conn) {
os << "Local|"; os << "Local|RecentErr|nbreak|";
} }
os << "SSL|Protocol |fd |" os << "SSL|Protocol |fd |"
"InBytes/s|In/s |InBytes/m |In/m |" "InBytes/s|In/s |InBytes/m |In/m |"
...@@ -171,8 +173,10 @@ void ConnectionsService::PrintConnections( ...@@ -171,8 +173,10 @@ void ConnectionsService::PrintConnections(
if (failed) { if (failed) {
os << min_width("Broken", 26) << bar os << min_width("Broken", 26) << bar
<< min_width(NameOfPoint(ptr->remote_side()), 19) << bar; << min_width(NameOfPoint(ptr->remote_side()), 19) << bar;
if (need_local) { if (is_channel_conn) {
os << min_width(ptr->local_side().port, 5) << bar; os << min_width(ptr->local_side().port, 5) << bar
<< min_width(ptr->recent_error_count(), 10) << bar
<< min_width(ptr->isolated_times(), 7) << bar;
} }
os << min_width("-", 3) << bar os << min_width("-", 3) << bar
<< min_width("-", 12) << bar << min_width("-", 12) << bar
...@@ -267,12 +271,14 @@ void ConnectionsService::PrintConnections( ...@@ -267,12 +271,14 @@ void ConnectionsService::PrintConnections(
strcpy(rtt_display, "-"); strcpy(rtt_display, "-");
} }
os << bar << min_width(NameOfPoint(ptr->remote_side()), 19) << bar; os << bar << min_width(NameOfPoint(ptr->remote_side()), 19) << bar;
if (need_local) { if (is_channel_conn) {
if (ptr->local_side().port > 0) { if (ptr->local_side().port > 0) {
os << min_width(ptr->local_side().port, 5) << bar; os << min_width(ptr->local_side().port, 5) << bar;
} else { } else {
os << min_width("-", 5) << bar; os << min_width("-", 5) << bar;
} }
os << min_width(ptr->recent_error_count(), 10) << bar
<< min_width(ptr->isolated_times(), 7) << bar;
} }
os << SSLStateToYesNo(ptr->ssl_state(), use_html) << bar; os << SSLStateToYesNo(ptr->ssl_state(), use_html) << bar;
char protname[32]; char protname[32];
...@@ -367,7 +373,7 @@ void ConnectionsService::default_method( ...@@ -367,7 +373,7 @@ void ConnectionsService::default_method(
conns.insert(conns.end(), internal_conns.begin(), internal_conns.end()); conns.insert(conns.end(), internal_conns.begin(), internal_conns.end());
} }
os << "server_connection_count: " << num_conns << '\n'; os << "server_connection_count: " << num_conns << '\n';
PrintConnections(os, conns, use_html, server, false/*need_local*/); PrintConnections(os, conns, use_html, server, false/*is_channel_conn*/);
if (has_uncopied) { if (has_uncopied) {
// Notice that we don't put the link of givemeall directly because // Notice that we don't put the link of givemeall directly because
// people seeing the link are very likely to click it which may be // people seeing the link are very likely to click it which may be
...@@ -380,7 +386,7 @@ void ConnectionsService::default_method( ...@@ -380,7 +386,7 @@ void ConnectionsService::default_method(
SocketMapList(&conns); SocketMapList(&conns);
os << (use_html ? "<br>\n" : "\n") os << (use_html ? "<br>\n" : "\n")
<< "channel_connection_count: " << GetChannelConnectionCount() << '\n'; << "channel_connection_count: " << GetChannelConnectionCount() << '\n';
PrintConnections(os, conns, use_html, server, true/*need_local*/); PrintConnections(os, conns, use_html, server, true/*is_channel_conn*/);
if (use_html) { if (use_html) {
os << "</body></html>\n"; os << "</body></html>\n";
......
...@@ -21,9 +21,9 @@ ...@@ -21,9 +21,9 @@
namespace brpc { namespace brpc {
DEFINE_int32(circuit_breaker_short_window_size, 500, DEFINE_int32(circuit_breaker_short_window_size, 1500,
"Short window sample size."); "Short window sample size.");
DEFINE_int32(circuit_breaker_long_window_size, 1000, DEFINE_int32(circuit_breaker_long_window_size, 3000,
"Long window sample size."); "Long window sample size.");
DEFINE_int32(circuit_breaker_short_window_error_percent, 10, DEFINE_int32(circuit_breaker_short_window_error_percent, 10,
"The maximum error rate allowed by the short window, ranging from 0-99."); "The maximum error rate allowed by the short window, ranging from 0-99.");
...@@ -39,6 +39,8 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100, ...@@ -39,6 +39,8 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds"); "Minimum isolation duration in milliseconds");
DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000,
"Maximum isolation duration in milliseconds"); "Maximum isolation duration in milliseconds");
DEFINE_double(circuit_breaker_epsilon_value, 0.02,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
namespace { namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA. // EPSILON is used to generate the smoothing coefficient when calculating EMA.
...@@ -51,7 +53,9 @@ namespace { ...@@ -51,7 +53,9 @@ namespace {
// when window_size = 1000, // when window_size = 1000,
// EPSILON = 0.1, smooth = 0.9977 // EPSILON = 0.1, smooth = 0.9977
// EPSILON = 0.3, smooth = 0.9987 // EPSILON = 0.3, smooth = 0.9987
const double EPSILON = 0.1;
#define EPSILON (FLAGS_circuit_breaker_epsilon_value)
} // namepace } // namepace
CircuitBreaker::EmaErrorRecorder::EmaErrorRecorder(int window_size, CircuitBreaker::EmaErrorRecorder::EmaErrorRecorder(int window_size,
...@@ -115,8 +119,8 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, ...@@ -115,8 +119,8 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
int64_t ema_error_cost = int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed); _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
ema_error_cost += error_cost; ema_error_cost += error_cost;
int64_t max_error_cost = ema_latency * _window_size * const int64_t max_error_cost =
(_max_error_percent / 100.0) * (1.0 + EPSILON); ema_latency * _window_size * (_max_error_percent / 100.0) * (1.0 + EPSILON);
return ema_error_cost <= max_error_cost; return ema_error_cost <= max_error_cost;
} }
...@@ -147,8 +151,9 @@ CircuitBreaker::CircuitBreaker() ...@@ -147,8 +151,9 @@ CircuitBreaker::CircuitBreaker()
, _short_window(FLAGS_circuit_breaker_short_window_size, , _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent) FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms()) , _last_reset_time_ms(butil::cpuwide_time_ms())
, _broken(false) , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) { , _isolated_times(0)
, _broken(false) {
} }
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
...@@ -159,9 +164,7 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) { ...@@ -159,9 +164,7 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
_short_window.OnCallEnd(error_code, latency)) { _short_window.OnCallEnd(error_code, latency)) {
return true; return true;
} }
if (!_broken.exchange(true, butil::memory_order_acquire)) { MarkAsBroken();
UpdateIsolationDuration();
}
return false; return false;
} }
...@@ -172,6 +175,13 @@ void CircuitBreaker::Reset() { ...@@ -172,6 +175,13 @@ void CircuitBreaker::Reset() {
_broken.store(false, butil::memory_order_release); _broken.store(false, butil::memory_order_release);
} }
void CircuitBreaker::MarkAsBroken() {
if (!_broken.exchange(true, butil::memory_order_acquire)) {
_isolated_times.fetch_add(1, butil::memory_order_relaxed);
UpdateIsolationDuration();
}
}
void CircuitBreaker::UpdateIsolationDuration() { void CircuitBreaker::UpdateIsolationDuration() {
int64_t now_time_ms = butil::cpuwide_time_ms(); int64_t now_time_ms = butil::cpuwide_time_ms();
int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed); int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed);
...@@ -188,4 +198,5 @@ void CircuitBreaker::UpdateIsolationDuration() { ...@@ -188,4 +198,5 @@ void CircuitBreaker::UpdateIsolationDuration() {
_isolation_duration_ms.store(isolation_duration_ms, butil::memory_order_relaxed); _isolation_duration_ms.store(isolation_duration_ms, butil::memory_order_relaxed);
} }
} // namespace brpc } // namespace brpc
...@@ -38,12 +38,22 @@ public: ...@@ -38,12 +38,22 @@ public:
// Reset CircuitBreaker and clear history data. will erase the historical // Reset CircuitBreaker and clear history data. will erase the historical
// data and start sampling again. Before you call this method, you need to // data and start sampling again. Before you call this method, you need to
// ensure that no one else is calling OnCallEnd. // ensure that no one else is accessing CircuitBreaker.
void Reset(); void Reset();
// Mark the Socket as broken. Call this method when you want to isolate a
// node in advance. When this method is called multiple times in succession,
// only the first call will take effect.
void MarkAsBroken();
// Number of times marked as broken
int isolated_times() const {
return _isolated_times.load(butil::memory_order_relaxed);
}
// The duration that should be isolated when the socket fails in milliseconds. // The duration that should be isolated when the socket fails in milliseconds.
// The higher the frequency of socket errors, the longer the duration. // The higher the frequency of socket errors, the longer the duration.
int isolation_duration_ms() { int isolation_duration_ms() const {
return _isolation_duration_ms.load(butil::memory_order_relaxed); return _isolation_duration_ms.load(butil::memory_order_relaxed);
} }
...@@ -55,7 +65,7 @@ private: ...@@ -55,7 +65,7 @@ private:
EmaErrorRecorder(int windows_size, int max_error_percent); EmaErrorRecorder(int windows_size, int max_error_percent);
bool OnCallEnd(int error_code, int64_t latency); bool OnCallEnd(int error_code, int64_t latency);
void Reset(); void Reset();
private: private:
int64_t UpdateLatency(int64_t latency); int64_t UpdateLatency(int64_t latency);
bool UpdateErrorCost(int64_t latency, int64_t ema_latency); bool UpdateErrorCost(int64_t latency, int64_t ema_latency);
...@@ -72,8 +82,9 @@ private: ...@@ -72,8 +82,9 @@ private:
EmaErrorRecorder _long_window; EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window; EmaErrorRecorder _short_window;
int64_t _last_reset_time_ms; int64_t _last_reset_time_ms;
butil::atomic<bool> _broken;
butil::atomic<int> _isolation_duration_ms; butil::atomic<int> _isolation_duration_ms;
butil::atomic<int> _isolated_times;
butil::atomic<bool> _broken;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -696,11 +696,22 @@ inline bool does_error_affect_main_socket(int error_code) { ...@@ -696,11 +696,22 @@ inline bool does_error_affect_main_socket(int error_code) {
// entire RPC (specified by c->FailedInline()). // entire RPC (specified by c->FailedInline()).
void Controller::Call::OnComplete( void Controller::Call::OnComplete(
Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) { Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) {
if (enable_circuit_breaker && sending_sock) { if (stream_user_data) {
sending_sock->FeedbackCircuitBreaker(error_code, stream_user_data->DestroyStreamUserData(sending_sock, c, error_code, end_of_rpc);
butil::gettimeofday_us() - begin_time_us); stream_user_data = NULL;
}
if (sending_sock != NULL) {
if (error_code != 0) {
sending_sock->AddRecentError();
}
if (enable_circuit_breaker) {
sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
} }
switch (c->connection_type()) { switch (c->connection_type()) {
case CONNECTION_TYPE_UNKNOWN: case CONNECTION_TYPE_UNKNOWN:
break; break;
...@@ -758,6 +769,7 @@ void Controller::Call::OnComplete( ...@@ -758,6 +769,7 @@ void Controller::Call::OnComplete(
} }
break; break;
} }
if (ELOGOFF == error_code) { if (ELOGOFF == error_code) {
SocketUniquePtr sock; SocketUniquePtr sock;
if (Socket::Address(peer_id, &sock) == 0) { if (Socket::Address(peer_id, &sock) == 0) {
...@@ -765,18 +777,13 @@ void Controller::Call::OnComplete( ...@@ -765,18 +777,13 @@ void Controller::Call::OnComplete(
sock->SetLogOff(); sock->SetLogOff();
} }
} }
if (need_feedback) { if (need_feedback) {
const LoadBalancer::CallInfo info = const LoadBalancer::CallInfo info =
{ begin_time_us, peer_id, error_code, c }; { begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (stream_user_data) {
stream_user_data->DestroyStreamUserData(sending_sock, c, error_code, end_of_rpc);
stream_user_data = NULL;
}
// Release the `Socket' we used to send/receive data // Release the `Socket' we used to send/receive data
sending_sock.reset(NULL); sending_sock.reset(NULL);
} }
......
...@@ -178,6 +178,8 @@ public: ...@@ -178,6 +178,8 @@ public:
CircuitBreaker circuit_breaker; CircuitBreaker circuit_breaker;
butil::atomic<uint64_t> recent_error_count;
explicit SharedPart(SocketId creator_socket_id); explicit SharedPart(SocketId creator_socket_id);
~SharedPart(); ~SharedPart();
...@@ -193,7 +195,8 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2) ...@@ -193,7 +195,8 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
, in_num_messages(0) , in_num_messages(0)
, out_size(0) , out_size(0)
, out_num_messages(0) , out_num_messages(0)
, extended_stat(NULL) { , extended_stat(NULL)
, recent_error_count(0) {
} }
Socket::SharedPart::~SharedPart() { Socket::SharedPart::~SharedPart() {
...@@ -766,6 +769,7 @@ void Socket::Revive() { ...@@ -766,6 +769,7 @@ void Socket::Revive() {
SharedPart* sp = GetSharedPart(); SharedPart* sp = GetSharedPart();
if (sp) { if (sp) {
sp->circuit_breaker.Reset(); sp->circuit_breaker.Reset();
sp->recent_error_count.store(0, butil::memory_order_relaxed);
} }
// Set this flag to true since we add additional ref again // Set this flag to true since we add additional ref again
_recycle_flag.store(false, butil::memory_order_relaxed); _recycle_flag.store(false, butil::memory_order_relaxed);
...@@ -802,6 +806,29 @@ int Socket::ReleaseAdditionalReference() { ...@@ -802,6 +806,29 @@ int Socket::ReleaseAdditionalReference() {
return -1; return -1;
} }
void Socket::AddRecentError() {
SharedPart* sp = GetSharedPart();
if (sp) {
sp->recent_error_count.fetch_add(1, butil::memory_order_relaxed);
}
}
int64_t Socket::recent_error_count() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->recent_error_count.load(butil::memory_order_relaxed);
}
return 0;
}
int Socket::isolated_times() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->circuit_breaker.isolated_times();
}
return 0;
}
int Socket::SetFailed(int error_code, const char* error_fmt, ...) { int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
if (error_code == 0) { if (error_code == 0) {
CHECK(false) << "error_code is 0"; CHECK(false) << "error_code is 0";
...@@ -836,6 +863,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { ...@@ -836,6 +863,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side // by Channel to revive never-connected socket when server side
// comes online. // comes online.
if (_health_check_interval_s > 0) { if (_health_check_interval_s > 0) {
GetOrNewSharedPart( )->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt( PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()), new HealthCheckTask(id()),
butil::milliseconds_from_now(GetOrNewSharedPart()-> butil::milliseconds_from_now(GetOrNewSharedPart()->
...@@ -876,8 +904,9 @@ int Socket::SetFailed() { ...@@ -876,8 +904,9 @@ int Socket::SetFailed() {
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) { void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) { if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker"; if (SetFailed(main_socket_id()) == 0) {
SetFailed(main_socket_id()); LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker";
}
} }
} }
......
...@@ -317,6 +317,12 @@ public: ...@@ -317,6 +317,12 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4))); __attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id); static int SetFailed(SocketId id);
void AddRecentError();
int64_t recent_error_count() const;
int isolated_times() const;
void FeedbackCircuitBreaker(int error_code, int64_t latency_us); void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
bool Failed() const; bool Failed() const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment