Commit aaf20ce8 authored by TousakaRin's avatar TousakaRin

Make the interface more robust

parent 74929cfc
......@@ -160,12 +160,20 @@ CircuitBreaker::CircuitBreaker()
FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms())
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolated_times(0) {
, _isolated_times(0)
, _broken(false) {
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
return _long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency);
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
if (_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
}
MarkAsBroken();
return false;
}
void CircuitBreaker::Reset() {
......@@ -175,8 +183,10 @@ void CircuitBreaker::Reset() {
}
void CircuitBreaker::MarkAsBroken() {
if (_broken.exchange(true, butil::memory_order_acquire)) {
++_isolated_times;
UpdateIsolationDuration();
}
}
int CircuitBreaker::health_score() const {
......
......@@ -31,6 +31,9 @@ public:
// be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call.
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
bool OnCallEnd(int error_code, int64_t latency);
// Reset CircuitBreaker and clear history data. will erase the historical
......@@ -38,8 +41,9 @@ public:
// ensure that no one else is calling OnCallEnd.
void Reset();
// Mark the Socket as broken. You should NOT call this method multiple
// times in succession.
// Mark the Socket as broken. Call this method when you want to isolate a
// node in advance. When this method is called multiple times in succession,
// only the first call will take effect.
void MarkAsBroken();
// The closer to 100, the less recent errors occurred, and 0 means that
......@@ -87,6 +91,7 @@ private:
int64_t _last_reset_time_ms;
int _isolation_duration_ms;
int _isolated_times;
butil::atomic<bool> _broken;
};
} // namespace brpc
......
......@@ -60,10 +60,14 @@ struct FeedbackControl {
: _req_num(req_num)
, _error_percent(error_percent)
, _circuit_breaker(circuit_breaker)
, _healthy_cnt(0)
, _unhealthy_cnt(0)
, _healthy(true) {}
int _req_num;
int _error_percent;
brpc::CircuitBreaker* _circuit_breaker;
int _healthy_cnt;
int _unhealthy_cnt;
bool _healthy;
};
......@@ -87,8 +91,10 @@ protected:
healthy = fc->_circuit_breaker->OnCallEnd(kErrorCodeForSucc, kLatency);
}
fc->_healthy = healthy;
if (!healthy) {
break;
if (healthy) {
++fc->_healthy_cnt;
} else {
++fc->_unhealthy_cnt;
}
}
return fc;
......@@ -120,11 +126,9 @@ TEST_F(CircuitBreakerTest, should_not_isolate) {
void* ret_data = NULL;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_EQ(fc->_unhealthy_cnt, 0);
EXPECT_TRUE(fc->_healthy);
}
// MarkAsBroken shoul be called only once.
_circuit_breaker.MarkAsBroken();
_circuit_breaker.Reset();
}
TEST_F(CircuitBreakerTest, should_isolate) {
......@@ -135,15 +139,12 @@ TEST_F(CircuitBreakerTest, should_isolate) {
void* ret_data = NULL;
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_GT(fc->_unhealthy_cnt, 0);
EXPECT_FALSE(fc->_healthy);
}
// MarkAsBroken shoul be called only once.
_circuit_breaker.MarkAsBroken();
_circuit_breaker.Reset();
}
TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs);
_circuit_breaker.Reset();
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
......@@ -153,9 +154,9 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
}
// MarkAsBroken shoul be called only once.
_circuit_breaker.MarkAsBroken();
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 2);
_circuit_breaker.Reset();
......@@ -166,9 +167,9 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
}
// MarkAsBroken shoul be called only once.
_circuit_breaker.MarkAsBroken();
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs * 4);
_circuit_breaker.Reset();
......@@ -179,8 +180,8 @@ TEST_F(CircuitBreakerTest, isolation_duration_grow) {
EXPECT_EQ(pthread_join(thread_list[i], &ret_data), 0);
FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
EXPECT_FALSE(fc->_healthy);
EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
EXPECT_GT(fc->_unhealthy_cnt, 0);
}
// MarkAsBroken shoul be called only once.
_circuit_breaker.MarkAsBroken();
EXPECT_EQ(_circuit_breaker.isolation_duration_ms(), kMinIsolationDurationMs);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment