Commit 61096437 authored by TousakaRin's avatar TousakaRin

Increase the duration of isolation

parent 7f20db9a
......@@ -16,6 +16,7 @@
#include <cmath>
#include <gflags/gflags.h>
#include <butil/time.h>
#include "brpc/circuit_breaker.h"
namespace brpc {
......@@ -34,6 +35,10 @@ DEFINE_int32(circuit_breaker_min_error_cost_us, 500,
DEFINE_int32(circuit_breaker_max_failed_latency_mutilple, 2,
"The maximum multiple of the latency of the failed request relative to "
"the average latency of the success requests.");
DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds");
DEFINE_int32(circuit_breaker_max_isolation_duration_ms, 300000,
"Maximum isolation duration in milliseconds");
namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
......@@ -147,17 +152,47 @@ CircuitBreaker::CircuitBreaker()
: _long_window(FLAGS_circuit_breaker_long_window_size,
FLAGS_circuit_breaker_long_window_error_percent)
, _short_window(FLAGS_circuit_breaker_short_window_size,
FLAGS_circuit_breaker_short_window_error_percent) {
FLAGS_circuit_breaker_short_window_error_percent)
, _last_reset_time_ms(butil::cpuwide_time_ms())
, _broken(false)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms) {
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
return _long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency);
if(_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
} else {
UpdateIsolationDuration();
return false;
}
}
void CircuitBreaker::Reset() {
_long_window.Reset();
_short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release);
}
void CircuitBreaker::UpdateIsolationDuration() {
if (!_broken.load(butil::memory_order_relaxed) &&
!_broken.exchange(true, butil::memory_order_acquire)) {
int64_t now_time_ms = butil::cpuwide_time_ms();
int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed);
const int max_isolation_duration_ms =
FLAGS_circuit_breaker_max_isolation_duration_ms;
const int min_isolation_duration_ms =
FLAGS_circuit_breaker_min_isolation_duration_ms;
if (now_time_ms - _last_reset_time_ms < max_isolation_duration_ms) {
isolation_duration_ms *= 2;
isolation_duration_ms =
std::min(isolation_duration_ms, max_isolation_duration_ms);
} else {
isolation_duration_ms = min_isolation_duration_ms;
}
_isolation_duration_ms.store(isolation_duration_ms, butil::memory_order_relaxed);
}
}
} // namespace brpc
......@@ -40,7 +40,16 @@ public:
// data and start sampling again. Before you call this method, you need to
// ensure that no one else is calling OnCallEnd.
void Reset();
// The duration that should be isolated when the socket fails in milliseconds.
// The higher the frequency of socket errors, the longer the duration.
int isolation_duration_ms() {
return _isolation_duration_ms.load(butil::memory_order_relaxed);
}
private:
void UpdateIsolationDuration();
class EmaErrorRecorder {
public:
EmaErrorRecorder(int windows_size, int max_error_percent);
......@@ -63,6 +72,9 @@ private:
EmaErrorRecorder _long_window;
EmaErrorRecorder _short_window;
int64_t _last_reset_time_ms;
butil::atomic<bool> _broken;
butil::atomic<int> _isolation_duration_ms;
};
} // namespace brpc
......
......@@ -839,15 +839,11 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// Do health-checking even if we're not connected before, needed
// by Channel to revive never-connected socket when server side
// comes online.
// FIXME(gejun): the initial delay should be related to uncommited
// CircuitBreaker and shorter for occasional errors and longer for
// frequent errors.
// NOTE: the delay should be positive right now to avoid HC timing
// issues in UT.
if (_health_check_interval_s > 0) {
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
butil::milliseconds_from_now(100/*NOTE*/));
butil::milliseconds_from_now(GetOrNewSharedPart()->
circuit_breaker.isolation_duration_ms()));
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
......@@ -872,7 +868,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// Socket's reference will hit 0(recycle) when no one addresses it.
ReleaseAdditionalReference();
// NOTE: This Socket may be recycled at this point, don't
// touch anything.
// touch anything.
return 0;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment