Unverified Commit ce503273 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #413 from TousakaRin/circuit_breaker

Circuit breaker
parents 74abadbe 620811e4
......@@ -44,6 +44,7 @@ ChannelOptions::ChannelOptions()
, timeout_ms(500)
, backup_request_ms(-1)
, max_retry(3)
, enable_circuit_breaker(false)
, protocol(PROTOCOL_BAIDU_STD)
, connection_type(CONNECTION_TYPE_UNKNOWN)
, succeed_without_server(true)
......@@ -383,6 +384,9 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
cntl->_request_protocol = _options.protocol;
cntl->_preferred_index = _preferred_index;
cntl->_retry_policy = _options.retry_policy;
if (_options.enable_circuit_breaker) {
cntl->add_flag(Controller::FLAGS_ENABLED_CIRCUIT_BREAKER);
}
const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range(
correlation_id, NULL, 2 + cntl->max_retry());
......
......@@ -69,6 +69,12 @@ struct ChannelOptions {
// Maximum: INT_MAX
int max_retry;
// When the error rate of a server node is too high, isolate the node.
// Note that this isolation is GLOBAL, the node will become unavailable
// for all channels running in this process during the isolation.
// Default: false
bool enable_circuit_breaker;
// Serialization protocol, defined in src/brpc/options.proto
// NOTE: You can assign name of the protocol to this field as well, for
// Example: options.protocol = "baidu_std";
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include <cmath>
#include <gflags/gflags.h>
#include <butil/time.h>
#include "brpc/circuit_breaker.h"
namespace brpc {
DEFINE_int32(circuit_breaker_short_window_size, 500,
"Short window sample size.");
DEFINE_int32(circuit_breaker_long_window_size, 1000,
"Long window sample size.");
DEFINE_int32(circuit_breaker_short_window_error_percent, 10,
"The maximum error rate allowed by the short window, ranging from 0-99.");
DEFINE_int32(circuit_breaker_long_window_error_percent, 5,
"The maximum error rate allowed by the long window, ranging from 0-99.");
DEFINE_int32(circuit_breaker_min_error_cost_us, 500,
"The minimum error_cost, when the ema of error cost is less than this "
"value, it will be set to zero.");
DEFINE_int32(circuit_breaker_max_failed_latency_mutiple, 2,
"The maximum multiple of the latency of the failed request relative to "
"the average latency of the success requests.");
DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds");
DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000,
"Maximum isolation duration in milliseconds");
namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
// The larger the EPSILON, the larger the smoothing coefficient, which means
// that the proportion of early data is larger.
// smooth = pow(EPSILON, 1 / window_size),
// eg: when window_size = 100,
// EPSILON = 0.1, smooth = 0.9772
// EPSILON = 0.3, smooth = 0.9880
// when window_size = 1000,
// EPSILON = 0.1, smooth = 0.9977
// EPSILON = 0.3, smooth = 0.9987
const double EPSILON = 0.1;
} // namepace
CircuitBreaker::EmaErrorRecorder::EmaErrorRecorder(int window_size,
int max_error_percent)
: _window_size(window_size)
, _max_error_percent(max_error_percent)
, _smooth(std::pow(EPSILON, 1.0/window_size))
, _sample_count(0)
, _ema_error_cost(0)
, _ema_latency(0) {
}
bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
int64_t latency) {
int64_t ema_latency = 0;
bool healthy = false;
if (error_code == 0) {
ema_latency = UpdateLatency(latency);
healthy = UpdateErrorCost(0, ema_latency);
} else {
ema_latency = _ema_latency.load(butil::memory_order_relaxed);
healthy = UpdateErrorCost(latency, ema_latency);
}
if (_sample_count.fetch_add(1, butil::memory_order_relaxed) < _window_size) {
return true;
}
return healthy;
}
void CircuitBreaker::EmaErrorRecorder::Reset() {
_sample_count.store(0, butil::memory_order_relaxed);
_ema_error_cost.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
}
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
do {
int64_t next_ema_latency = 0;
if (0 == ema_latency) {
next_ema_latency = latency;
} else {
next_ema_latency = ema_latency * _smooth + latency * (1 - _smooth);
}
if (_ema_latency.compare_exchange_weak(ema_latency, next_ema_latency)) {
return next_ema_latency;
}
} while(true);
}
bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
int64_t ema_latency) {
const int max_mutiple = FLAGS_circuit_breaker_max_failed_latency_mutiple;
if (ema_latency != 0) {
error_cost = std::min(ema_latency * max_mutiple, error_cost);
}
//Errorous response
if (error_cost != 0) {
int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
ema_error_cost += error_cost;
int64_t max_error_cost = ema_latency * _window_size *
(_max_error_percent / 100.0) * (1.0 + EPSILON);
return ema_error_cost <= max_error_cost;
}
//Ordinary response
int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
do {
if (ema_error_cost == 0) {
break;
} else if (ema_error_cost < FLAGS_circuit_breaker_min_error_cost_us) {
if (_ema_error_cost.compare_exchange_weak(
ema_error_cost, 0, butil::memory_order_relaxed)) {
break;
}
} else {
int64_t next_ema_error_cost = ema_error_cost * _smooth;
if (_ema_error_cost.compare_exchange_weak(
ema_error_cost, next_ema_error_cost)) {
break;
}
}
} while (true);
return true;
}
CircuitBreaker::CircuitBreaker()
: _long_window(FLAGS_circuit_breaker_long_window_size,
FLAGS_circuit_breaker_long_window_error_percent)
, _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms())
, _broken(false)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) {
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
if (!_long_window.OnCallEnd(error_code, latency) ||
!_short_window.OnCallEnd(error_code, latency)) {
if (!_broken.exchange(true, butil::memory_order_acquire)) {
UpdateIsolationDuration();
}
return false;
}
return true;
}
void CircuitBreaker::Reset() {
_long_window.Reset();
_short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release);
}
void CircuitBreaker::UpdateIsolationDuration() {
int64_t now_time_ms = butil::cpuwide_time_ms();
int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed);
const int max_isolation_duration_ms =
FLAGS_circuit_breaker_max_isolation_duration_ms;
const int min_isolation_duration_ms =
FLAGS_circuit_breaker_min_isolation_duration_ms;
if (now_time_ms - _last_reset_time_ms < max_isolation_duration_ms) {
isolation_duration_ms =
std::min(isolation_duration_ms * 2, max_isolation_duration_ms);
} else {
isolation_duration_ms = min_isolation_duration_ms;
}
_isolation_duration_ms.store(isolation_duration_ms, butil::memory_order_relaxed);
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H
#include "butil/atomicops.h"
namespace brpc {
class CircuitBreaker {
public:
CircuitBreaker();
~CircuitBreaker() {}
// Sampling the current rpc. Returns false if a node needs to
// be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call.
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency);
// Reset CircuitBreaker and clear history data. will erase the historical
// data and start sampling again. Before you call this method, you need to
// ensure that no one else is calling OnCallEnd.
void Reset();
// The duration that should be isolated when the socket fails in milliseconds.
// The higher the frequency of socket errors, the longer the duration.
int isolation_duration_ms() {
return _isolation_duration_ms.load(butil::memory_order_relaxed);
}
private:
void UpdateIsolationDuration();
class EmaErrorRecorder {
public:
EmaErrorRecorder(int windows_size, int max_error_percent);
bool OnCallEnd(int error_code, int64_t latency);
void Reset();
private:
int64_t UpdateLatency(int64_t latency);
bool UpdateErrorCost(int64_t latency, int64_t ema_latency);
const int _window_size;
const int _max_error_percent;
const double _smooth;
butil::atomic<int64_t> _sample_count;
butil::atomic<int64_t> _ema_error_cost;
butil::atomic<int64_t> _ema_latency;
};
EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window;
int64_t _last_reset_time_ms;
butil::atomic<bool> _broken;
butil::atomic<int> _isolation_duration_ms;
};
} // namespace brpc
#endif // BRPC_CIRCUIT_BREAKER_H_
......@@ -277,6 +277,7 @@ Controller::Call::~Call() {
void Controller::Call::Reset() {
nretry = 0;
need_feedback = false;
enable_circuit_breaker = false;
touched_by_stream_creator = false;
peer_id = (SocketId)-1;
begin_time_us = 0;
......@@ -763,6 +764,10 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
c->stream_creator()->CleanupSocketForStream(
sending_sock.get(), c, error_code);
}
if (enable_circuit_breaker && sending_sock) {
sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
// Release the `Socket' we used to send/receive data
sending_sock.reset(NULL);
......@@ -963,6 +968,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Pick a target server for sending RPC
_current_call.need_feedback = false;
_current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock;
if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction
......
......@@ -130,6 +130,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
public:
Controller();
......@@ -559,11 +560,12 @@ private:
void Reset();
void OnComplete(Controller* c, int error_code, bool responded);
int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback.
bool touched_by_stream_creator;
SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time.
int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback.
bool enable_circuit_breaker; // The channel enabled circuit_breaker
bool touched_by_stream_creator;
SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time.
// The actual `Socket' for sending RPC. It's socket id will be
// exactly the same as `peer_id' if `_connection_type' is
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
......@@ -600,6 +602,10 @@ private:
void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
bool has_enabled_circuit_breaker() const {
return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER);
}
private:
// NOTE: align and group fields to make Controller as compact as possible.
......
......@@ -36,6 +36,7 @@
#include "brpc/event_dispatcher.h" // RemoveConsumer
#include "brpc/socket.h"
#include "brpc/describable.h" // Describable
#include "brpc/circuit_breaker.h" // CircuitBreaker
#include "brpc/input_messenger.h"
#include "brpc/details/sparse_minute_counter.h"
#include "brpc/stream_impl.h"
......@@ -177,6 +178,8 @@ public:
// For computing stats.
ExtendedSocketStat* extended_stat;
CircuitBreaker circuit_breaker;
explicit SharedPart(SocketId creator_socket_id);
~SharedPart();
......@@ -762,6 +765,10 @@ void Socket::Revive() {
vref, MakeVRef(id_ver, nref + 1/*note*/),
butil::memory_order_release,
butil::memory_order_relaxed)) {
SharedPart* sp = GetSharedPart();
if (sp) {
sp->circuit_breaker.Reset();
}
// Set this flag to true since we add additional ref again
_recycle_flag.store(false, butil::memory_order_relaxed);
if (_user) {
......@@ -830,15 +837,11 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// Do health-checking even if we're not connected before, needed
// by Channel to revive never-connected socket when server side
// comes online.
// FIXME(gejun): the initial delay should be related to uncommited
// CircuitBreaker and shorter for occasional errors and longer for
// frequent errors.
// NOTE: the delay should be positive right now to avoid HC timing
// issues in UT.
if (_health_check_interval_s > 0) {
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
butil::milliseconds_from_now(100/*NOTE*/));
butil::milliseconds_from_now(GetOrNewSharedPart()->
circuit_breaker.isolation_duration_ms()));
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
......@@ -873,6 +876,13 @@ int Socket::SetFailed() {
return SetFailed(EFAILEDSOCKET, NULL);
}
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, latency_us)) {
LOG(ERROR) << "Socket[" << *this << "] isolated by circuit breaker";
SetFailed(main_socket_id());
}
}
int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
const int64_t last_active_us = last_active_time_us();
if (butil::cpuwide_time_us() - last_active_us <= idle_seconds * 1000000L) {
......
......@@ -315,6 +315,8 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id);
void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
bool Failed() const;
bool DidReleaseAdditionalRereference() const
......
......@@ -398,7 +398,7 @@ TEST(NamingServiceTest, consul_with_backup_file) {
restful_map.c_str()));
ASSERT_EQ(0, server.Start("localhost:8500", NULL));
bthread_usleep(2000000);
bthread_usleep(5000000);
butil::EndPoint n1;
ASSERT_EQ(0, butil::str2endpoint("10.121.36.189:8003", &n1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment