Unverified Commit cecb5785 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #701 from TousakaRin/circuit_breaker

CircuitBreaker: fix race condition, adjust reset policy
parents bfa2908d f4f4791a
...@@ -39,7 +39,7 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100, ...@@ -39,7 +39,7 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds"); "Minimum isolation duration in milliseconds");
DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000, DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 30000,
"Maximum isolation duration in milliseconds"); "Maximum isolation duration in milliseconds");
DEFINE_double(circuit_breaker_epsilon_value, 0.02, DEFINE_double(circuit_breaker_epsilon_value, 0.02,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"); "ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
namespace { namespace {
...@@ -81,14 +81,14 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code, ...@@ -81,14 +81,14 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
healthy = UpdateErrorCost(latency, ema_latency); healthy = UpdateErrorCost(latency, ema_latency);
} }
// When the window is initializing, use error_rate to determine // When the window is initializing, use error_rate to determine
// if it needs to be isolated. // if it needs to be isolated.
if (_sample_count_when_initializing.load(butil::memory_order_relaxed) < _window_size && if (_sample_count_when_initializing.load(butil::memory_order_relaxed) < _window_size &&
_sample_count_when_initializing.fetch_add(1, butil::memory_order_relaxed) < _window_size) { _sample_count_when_initializing.fetch_add(1, butil::memory_order_relaxed) < _window_size) {
if (error_code != 0) { if (error_code != 0) {
const int32_t error_count = const int32_t error_count =
_error_count_when_initializing.fetch_add(1, butil::memory_order_relaxed); _error_count_when_initializing.fetch_add(1, butil::memory_order_relaxed);
return error_count < _window_size * _max_error_percent / 100; return error_count < _window_size * _max_error_percent / 100;
} }
// Because once OnCallEnd returned false, the node will be ioslated soon, // Because once OnCallEnd returned false, the node will be ioslated soon,
// so when error_code=0, we no longer check the error count. // so when error_code=0, we no longer check the error count.
...@@ -99,10 +99,12 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code, ...@@ -99,10 +99,12 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
} }
void CircuitBreaker::EmaErrorRecorder::Reset() { void CircuitBreaker::EmaErrorRecorder::Reset() {
_sample_count_when_initializing.store(0, butil::memory_order_relaxed); if (_sample_count_when_initializing.load(butil::memory_order_relaxed) < _window_size) {
_error_count_when_initializing.store(0, butil::memory_order_relaxed); _sample_count_when_initializing.store(0, butil::memory_order_relaxed);
_error_count_when_initializing.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
}
_ema_error_cost.store(0, butil::memory_order_relaxed); _ema_error_cost.store(0, butil::memory_order_relaxed);
_ema_latency.store(0, butil::memory_order_relaxed);
} }
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) { int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
...@@ -162,9 +164,9 @@ CircuitBreaker::CircuitBreaker() ...@@ -162,9 +164,9 @@ CircuitBreaker::CircuitBreaker()
FLAGS_circuit_breaker_long_window_error_percent) FLAGS_circuit_breaker_long_window_error_percent)
, _short_window(FLAGS_circuit_breaker_short_window_size, , _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent) FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms()) , _last_reset_time_ms(0)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) , _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolated_times(0) , _isolated_times(0)
, _broken(false) { , _broken(false) {
} }
......
// Copyright (c) 2014 Baidu, Inc.G // Copyright (c) 2014 Baidu, Inc.G
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#ifndef BRPC_CIRCUIT_BREAKER_H #ifndef BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H #define BRPC_CIRCUIT_BREAKER_H
#include "butil/atomicops.h" #include "butil/atomicops.h"
namespace brpc { namespace brpc {
...@@ -27,22 +27,22 @@ public: ...@@ -27,22 +27,22 @@ public:
~CircuitBreaker() {} ~CircuitBreaker() {}
// Sampling the current rpc. Returns false if a node needs to // Sampling the current rpc. Returns false if a node needs to
// be isolated. Otherwise return true. // be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success. // error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call. // latency: Time cost of this call.
// Note: Once OnCallEnd() determined that a node needs to be isolated, // Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset() // it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread. // will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency); bool OnCallEnd(int error_code, int64_t latency);
// Reset CircuitBreaker and clear history data. will erase the historical // Reset CircuitBreaker and clear history data. will erase the historical
// data and start sampling again. Before you call this method, you need to // data and start sampling again. Before you call this method, you need to
// ensure that no one else is accessing CircuitBreaker. // ensure that no one else is accessing CircuitBreaker.
void Reset(); void Reset();
// Mark the Socket as broken. Call this method when you want to isolate a // Mark the Socket as broken. Call this method when you want to isolate a
// node in advance. When this method is called multiple times in succession, // node in advance. When this method is called multiple times in succession,
// only the first call will take effect. // only the first call will take effect.
void MarkAsBroken(); void MarkAsBroken();
...@@ -82,7 +82,7 @@ private: ...@@ -82,7 +82,7 @@ private:
EmaErrorRecorder _long_window; EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window; EmaErrorRecorder _short_window;
int64_t _last_reset_time_ms; int64_t _last_reset_time_ms;
butil::atomic<int> _isolation_duration_ms; butil::atomic<int> _isolation_duration_ms;
butil::atomic<int> _isolated_times; butil::atomic<int> _isolated_times;
butil::atomic<bool> _broken; butil::atomic<bool> _broken;
......
...@@ -728,6 +728,12 @@ int Socket::WaitAndReset(int32_t expected_nref) { ...@@ -728,6 +728,12 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q->clear(); _pipeline_q->clear();
} }
} }
SharedPart* sp = GetSharedPart();
if (sp) {
sp->circuit_breaker.Reset();
sp->recent_error_count.store(0, butil::memory_order_relaxed);
}
return 0; return 0;
} }
...@@ -750,11 +756,6 @@ void Socket::Revive() { ...@@ -750,11 +756,6 @@ void Socket::Revive() {
vref, MakeVRef(id_ver, nref + 1/*note*/), vref, MakeVRef(id_ver, nref + 1/*note*/),
butil::memory_order_release, butil::memory_order_release,
butil::memory_order_relaxed)) { butil::memory_order_relaxed)) {
SharedPart* sp = GetSharedPart();
if (sp) {
sp->circuit_breaker.Reset();
sp->recent_error_count.store(0, butil::memory_order_relaxed);
}
// Set this flag to true since we add additional ref again // Set this flag to true since we add additional ref again
_recycle_flag.store(false, butil::memory_order_relaxed); _recycle_flag.store(false, butil::memory_order_relaxed);
if (_user) { if (_user) {
......
...@@ -22,8 +22,8 @@ const int kShortWindowSize = 500; ...@@ -22,8 +22,8 @@ const int kShortWindowSize = 500;
const int kLongWindowSize = 1000; const int kLongWindowSize = 1000;
const int kShortWindowErrorPercent = 10; const int kShortWindowErrorPercent = 10;
const int kLongWindowErrorPercent = 5; const int kLongWindowErrorPercent = 5;
const int kMinIsolationDurationMs = 100; const int kMinIsolationDurationMs = 10;
const int kMaxIsolationDurationMs = 1000; const int kMaxIsolationDurationMs = 200;
const int kErrorCodeForFailed = 131; const int kErrorCodeForFailed = 131;
const int kErrorCodeForSucc = 0; const int kErrorCodeForSucc = 0;
const int kErrorCost = 1000; const int kErrorCost = 1000;
...@@ -60,8 +60,8 @@ struct FeedbackControl { ...@@ -60,8 +60,8 @@ struct FeedbackControl {
: _req_num(req_num) : _req_num(req_num)
, _error_percent(error_percent) , _error_percent(error_percent)
, _circuit_breaker(circuit_breaker) , _circuit_breaker(circuit_breaker)
, _healthy_cnt(0) , _healthy_cnt(0)
, _unhealthy_cnt(0) , _unhealthy_cnt(0)
, _healthy(true) {} , _healthy(true) {}
int _req_num; int _req_num;
int _error_percent; int _error_percent;
...@@ -86,7 +86,7 @@ protected: ...@@ -86,7 +86,7 @@ protected:
for (int i = 0; i < fc->_req_num; ++i) { for (int i = 0; i < fc->_req_num; ++i) {
bool healthy = false; bool healthy = false;
if (rand() % 100 < fc->_error_percent) { if (rand() % 100 < fc->_error_percent) {
healthy = fc->_circuit_breaker->OnCallEnd(kErrorCodeForFailed, kErrorCost); healthy = fc->_circuit_breaker->OnCallEnd(kErrorCodeForFailed, kErrorCost);
} else { } else {
healthy = fc->_circuit_breaker->OnCallEnd(kErrorCodeForSucc, kLatency); healthy = fc->_circuit_breaker->OnCallEnd(kErrorCodeForSucc, kLatency);
} }
...@@ -100,7 +100,7 @@ protected: ...@@ -100,7 +100,7 @@ protected:
return fc; return fc;
} }
void StartFeedbackThread(std::vector<pthread_t>* thread_list, void StartFeedbackThread(std::vector<pthread_t>* thread_list,
std::vector<std::unique_ptr<FeedbackControl>>* fc_list, std::vector<std::unique_ptr<FeedbackControl>>* fc_list,
int error_percent) { int error_percent) {
thread_list->clear(); thread_list->clear();
...@@ -110,7 +110,7 @@ protected: ...@@ -110,7 +110,7 @@ protected:
FeedbackControl* fc = FeedbackControl* fc =
new FeedbackControl(2 * kLongWindowSize, error_percent, &_circuit_breaker); new FeedbackControl(2 * kLongWindowSize, error_percent, &_circuit_breaker);
fc_list->emplace_back(fc); fc_list->emplace_back(fc);
pthread_create(&tid, NULL, feed_back_thread, fc); pthread_create(&tid, nullptr, feed_back_thread, fc);
thread_list->push_back(tid); thread_list->push_back(tid);
} }
} }
...@@ -123,35 +123,46 @@ TEST_F(CircuitBreakerTest, should_not_isolate) { ...@@ -123,35 +123,46 @@ TEST_F(CircuitBreakerTest, should_not_isolate) {
std::vector<std::unique_ptr<FeedbackControl>> fc_list; std::vector<std::unique_ptr<FeedbackControl>> fc_list;
StartFeedbackThread(&thread_list, &fc_list, 3); StartFeedbackThread(&thread_list, &fc_list, 3);
for (int i = 0; i < kThreadNum; ++i) { for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = NULL; void* ret_data = nullptr;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0); ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_EQ(fc->_unhealthy_cnt, 0); EXPECT_EQ(fc->_unhealthy_cnt, 0);
EXPECT_TRUE(fc->_healthy); EXPECT_TRUE(fc->_healthy);
} }
} }
TEST_F(CircuitBreakerTest, should_isolate) { TEST_F(CircuitBreakerTest, should_isolate) {
std::vector<pthread_t> thread_list; std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list; std::vector<std::unique_ptr<FeedbackControl>> fc_list;
StartFeedbackThread(&thread_list, &fc_list, 50); StartFeedbackThread(&thread_list, &fc_list, 50);
for (int i = 0; i < kThreadNum; ++i) { for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = NULL; void* ret_data = nullptr;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0); ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_GT(fc->_unhealthy_cnt, 0); EXPECT_GT(fc->_unhealthy_cnt, 0);
EXPECT_FALSE(fc->_healthy); EXPECT_FALSE(fc->_healthy);
} }
} }
TEST_F(CircuitBreakerTest, isolation_duration_grow) { TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) {
_circuit_breaker.Reset();
std::vector<pthread_t> thread_list; std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list; std::vector<std::unique_ptr<FeedbackControl>> fc_list;
StartFeedbackThread(&thread_list, &fc_list, 100); StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) { for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = NULL; void* ret_data = nullptr;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0); ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
}
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs);
_circuit_breaker.Reset();
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy); EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
...@@ -160,11 +171,10 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) { ...@@ -160,11 +171,10 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 2); EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 2);
_circuit_breaker.Reset(); _circuit_breaker.Reset();
bthread_usleep(kMinIsolationDurationMs * 1000);
StartFeedbackThread(&thread_list, &fc_list, 100); StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) { for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = NULL; void* ret_data = nullptr;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0); ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy); EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
...@@ -173,15 +183,38 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) { ...@@ -173,15 +183,38 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 4); EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 4);
_circuit_breaker.Reset(); _circuit_breaker.Reset();
bthread_usleep((kMaxIsolationDurationMs + kMinIsolationDurationMs) * 1000); ::usleep((kMaxIsolationDurationMs + kMinIsolationDurationMs) * 1000);
StartFeedbackThread(&thread_list, &fc_list, 100); StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) { for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = NULL; void* ret_data = nullptr;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0); ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data); FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy); EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize); EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0); EXPECT_GT(fc->_unhealthy_cnt, 0);
} }
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs); EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs);
}
TEST_F(CircuitBreakerTest, maximum_isolation_duration) {
brpc::FLAGS_circuit_breaker_max_isolation_duration_ms =
brpc::FLAGS_circuit_breaker_min_isolation_duration_ms + 1;
ASSERT_LT(brpc::FLAGS_circuit_breaker_max_isolation_duration_ms,
2 * brpc::FLAGS_circuit_breaker_min_isolation_duration_ms);
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
_circuit_breaker.Reset();
StartFeedbackThread(&thread_list, &fc_list, 100);
for (int i = 0; i < kThreadNum; ++i) {
void* ret_data = nullptr;
ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
}
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(),
brpc::FLAGS_circuit_breaker_max_isolation_duration_ms);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment