Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
a2bb114e
Commit
a2bb114e
authored
May 07, 2019
by
helei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix data race for circuit breaker
parent
cc6642bd
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
33 additions
and
23 deletions
+33
-23
circuit_breaker.cpp
src/brpc/circuit_breaker.cpp
+18
-10
circuit_breaker.h
src/brpc/circuit_breaker.h
+9
-8
socket.cpp
src/brpc/socket.cpp
+6
-5
No files found.
src/brpc/circuit_breaker.cpp
View file @
a2bb114e
...
@@ -39,7 +39,7 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
...
@@ -39,7 +39,7 @@ DEFINE_int32(circuit_breaker_min_isolation_duration_ms, 100,
"Minimum isolation duration in milliseconds"
);
"Minimum isolation duration in milliseconds"
);
DEFINE_int32
(
circuit_breaker_max_isolation_duration_ms
,
30000
,
DEFINE_int32
(
circuit_breaker_max_isolation_duration_ms
,
30000
,
"Maximum isolation duration in milliseconds"
);
"Maximum isolation duration in milliseconds"
);
DEFINE_double
(
circuit_breaker_epsilon_value
,
0.02
,
DEFINE_double
(
circuit_breaker_epsilon_value
,
0.02
,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"
);
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)"
);
namespace
{
namespace
{
...
@@ -81,14 +81,14 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
...
@@ -81,14 +81,14 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
healthy
=
UpdateErrorCost
(
latency
,
ema_latency
);
healthy
=
UpdateErrorCost
(
latency
,
ema_latency
);
}
}
// When the window is initializing, use error_rate to determine
// When the window is initializing, use error_rate to determine
// if it needs to be isolated.
// if it needs to be isolated.
if
(
_sample_count_when_initializing
.
load
(
butil
::
memory_order_relaxed
)
<
_window_size
&&
if
(
_sample_count_when_initializing
.
load
(
butil
::
memory_order_relaxed
)
<
_window_size
&&
_sample_count_when_initializing
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
)
<
_window_size
)
{
_sample_count_when_initializing
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
)
<
_window_size
)
{
if
(
error_code
!=
0
)
{
if
(
error_code
!=
0
)
{
const
int32_t
error_count
=
const
int32_t
error_count
=
_error_count_when_initializing
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
_error_count_when_initializing
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
return
error_count
<
_window_size
*
_max_error_percent
/
100
;
return
error_count
<
_window_size
*
_max_error_percent
/
100
;
}
}
// Because once OnCallEnd returned false, the node will be ioslated soon,
// Because once OnCallEnd returned false, the node will be ioslated soon,
// so when error_code=0, we no longer check the error count.
// so when error_code=0, we no longer check the error count.
...
@@ -99,10 +99,12 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
...
@@ -99,10 +99,12 @@ bool CircuitBreaker::EmaErrorRecorder::OnCallEnd(int error_code,
}
}
void
CircuitBreaker
::
EmaErrorRecorder
::
Reset
()
{
void
CircuitBreaker
::
EmaErrorRecorder
::
Reset
()
{
_sample_count_when_initializing
.
store
(
0
,
butil
::
memory_order_relaxed
);
if
(
_sample_count_when_initializing
.
load
(
butil
::
memory_order_relaxed
)
<
_window_size
)
{
_error_count_when_initializing
.
store
(
0
,
butil
::
memory_order_relaxed
);
_sample_count_when_initializing
.
store
(
0
,
butil
::
memory_order_relaxed
);
_error_count_when_initializing
.
store
(
0
,
butil
::
memory_order_relaxed
);
_ema_latency
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
_ema_error_cost
.
store
(
0
,
butil
::
memory_order_relaxed
);
_ema_error_cost
.
store
(
0
,
butil
::
memory_order_relaxed
);
_ema_latency
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
}
int64_t
CircuitBreaker
::
EmaErrorRecorder
::
UpdateLatency
(
int64_t
latency
)
{
int64_t
CircuitBreaker
::
EmaErrorRecorder
::
UpdateLatency
(
int64_t
latency
)
{
...
@@ -162,9 +164,10 @@ CircuitBreaker::CircuitBreaker()
...
@@ -162,9 +164,10 @@ CircuitBreaker::CircuitBreaker()
FLAGS_circuit_breaker_long_window_error_percent
)
FLAGS_circuit_breaker_long_window_error_percent
)
,
_short_window
(
FLAGS_circuit_breaker_short_window_size
,
,
_short_window
(
FLAGS_circuit_breaker_short_window_size
,
FLAGS_circuit_breaker_short_window_error_percent
)
FLAGS_circuit_breaker_short_window_error_percent
)
,
_last_re
set
_time_ms
(
butil
::
cpuwide_time_ms
())
,
_last_re
vived
_time_ms
(
butil
::
cpuwide_time_ms
())
,
_isolation_duration_ms
(
FLAGS_circuit_breaker_min_isolation_duration_ms
)
,
_isolation_duration_ms
(
FLAGS_circuit_breaker_min_isolation_duration_ms
)
,
_isolated_times
(
0
)
,
_isolated_times
(
0
)
,
_is_first_call_after_revived
(
true
)
,
_broken
(
false
)
{
,
_broken
(
false
)
{
}
}
...
@@ -172,6 +175,10 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
...
@@ -172,6 +175,10 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
if
(
_broken
.
load
(
butil
::
memory_order_relaxed
))
{
if
(
_broken
.
load
(
butil
::
memory_order_relaxed
))
{
return
false
;
return
false
;
}
}
if
(
_is_first_call_after_revived
.
load
(
butil
::
memory_order_relaxed
)
&&
_is_first_call_after_revived
.
exchange
(
false
,
butil
::
memory_order_relaxed
))
{
_last_revived_time_ms
.
store
(
butil
::
cpuwide_time_ms
(),
butil
::
memory_order_relaxed
);
}
if
(
_long_window
.
OnCallEnd
(
error_code
,
latency
)
&&
if
(
_long_window
.
OnCallEnd
(
error_code
,
latency
)
&&
_short_window
.
OnCallEnd
(
error_code
,
latency
))
{
_short_window
.
OnCallEnd
(
error_code
,
latency
))
{
return
true
;
return
true
;
...
@@ -183,7 +190,8 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
...
@@ -183,7 +190,8 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
void
CircuitBreaker
::
Reset
()
{
void
CircuitBreaker
::
Reset
()
{
_long_window
.
Reset
();
_long_window
.
Reset
();
_short_window
.
Reset
();
_short_window
.
Reset
();
_last_reset_time_ms
=
butil
::
cpuwide_time_ms
();
_last_revived_time_ms
.
store
(
butil
::
cpuwide_time_ms
(),
butil
::
memory_order_relaxed
);
_is_first_call_after_revived
.
store
(
true
,
butil
::
memory_order_relaxed
);
_broken
.
store
(
false
,
butil
::
memory_order_release
);
_broken
.
store
(
false
,
butil
::
memory_order_release
);
}
}
...
@@ -201,7 +209,7 @@ void CircuitBreaker::UpdateIsolationDuration() {
...
@@ -201,7 +209,7 @@ void CircuitBreaker::UpdateIsolationDuration() {
FLAGS_circuit_breaker_max_isolation_duration_ms
;
FLAGS_circuit_breaker_max_isolation_duration_ms
;
const
int
min_isolation_duration_ms
=
const
int
min_isolation_duration_ms
=
FLAGS_circuit_breaker_min_isolation_duration_ms
;
FLAGS_circuit_breaker_min_isolation_duration_ms
;
if
(
now_time_ms
-
_last_re
set
_time_ms
<
max_isolation_duration_ms
)
{
if
(
now_time_ms
-
_last_re
vived
_time_ms
<
max_isolation_duration_ms
)
{
isolation_duration_ms
=
isolation_duration_ms
=
std
::
min
(
isolation_duration_ms
*
2
,
max_isolation_duration_ms
);
std
::
min
(
isolation_duration_ms
*
2
,
max_isolation_duration_ms
);
}
else
{
}
else
{
...
...
src/brpc/circuit_breaker.h
View file @
a2bb114e
// Copyright (c) 2014 Baidu, Inc.G
// Copyright (c) 2014 Baidu, Inc.G
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// You may obtain a copy of the License at
...
@@ -16,7 +16,7 @@
...
@@ -16,7 +16,7 @@
#ifndef BRPC_CIRCUIT_BREAKER_H
#ifndef BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H
#define BRPC_CIRCUIT_BREAKER_H
#include "butil/atomicops.h"
#include "butil/atomicops.h"
namespace
brpc
{
namespace
brpc
{
...
@@ -27,22 +27,22 @@ public:
...
@@ -27,22 +27,22 @@ public:
~
CircuitBreaker
()
{}
~
CircuitBreaker
()
{}
// Sampling the current rpc. Returns false if a node needs to
// Sampling the current rpc. Returns false if a node needs to
// be isolated. Otherwise return true.
// be isolated. Otherwise return true.
// error_code: Error_code of this call, 0 means success.
// error_code: Error_code of this call, 0 means success.
// latency: Time cost of this call.
// latency: Time cost of this call.
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// Note: Once OnCallEnd() determined that a node needs to be isolated,
// it will always return false until you call Reset(). Usually Reset()
// it will always return false until you call Reset(). Usually Reset()
// will be called in the health check thread.
// will be called in the health check thread.
bool
OnCallEnd
(
int
error_code
,
int64_t
latency
);
bool
OnCallEnd
(
int
error_code
,
int64_t
latency
);
// Reset CircuitBreaker and clear history data. will erase the historical
// Reset CircuitBreaker and clear history data. will erase the historical
// data and start sampling again. Before you call this method, you need to
// data and start sampling again. Before you call this method, you need to
// ensure that no one else is accessing CircuitBreaker.
// ensure that no one else is accessing CircuitBreaker.
void
Reset
();
void
Reset
();
// Mark the Socket as broken. Call this method when you want to isolate a
// Mark the Socket as broken. Call this method when you want to isolate a
// node in advance. When this method is called multiple times in succession,
// node in advance. When this method is called multiple times in succession,
// only the first call will take effect.
// only the first call will take effect.
void
MarkAsBroken
();
void
MarkAsBroken
();
...
@@ -82,9 +82,10 @@ private:
...
@@ -82,9 +82,10 @@ private:
EmaErrorRecorder
_long_window
;
EmaErrorRecorder
_long_window
;
EmaErrorRecorder
_short_window
;
EmaErrorRecorder
_short_window
;
int64_t
_last_reset_time_ms
;
butil
::
atomic
<
int64_t
>
_last_revived_time_ms
;
butil
::
atomic
<
int
>
_isolation_duration_ms
;
butil
::
atomic
<
int
>
_isolation_duration_ms
;
butil
::
atomic
<
int
>
_isolated_times
;
butil
::
atomic
<
int
>
_isolated_times
;
butil
::
atomic
<
bool
>
_is_first_call_after_revived
;
butil
::
atomic
<
bool
>
_broken
;
butil
::
atomic
<
bool
>
_broken
;
};
};
...
...
src/brpc/socket.cpp
View file @
a2bb114e
...
@@ -728,6 +728,12 @@ int Socket::WaitAndReset(int32_t expected_nref) {
...
@@ -728,6 +728,12 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q
->
clear
();
_pipeline_q
->
clear
();
}
}
}
}
SharedPart
*
sp
=
GetSharedPart
();
if
(
sp
)
{
sp
->
circuit_breaker
.
Reset
();
sp
->
recent_error_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
return
0
;
return
0
;
}
}
...
@@ -750,11 +756,6 @@ void Socket::Revive() {
...
@@ -750,11 +756,6 @@ void Socket::Revive() {
vref
,
MakeVRef
(
id_ver
,
nref
+
1
/*note*/
),
vref
,
MakeVRef
(
id_ver
,
nref
+
1
/*note*/
),
butil
::
memory_order_release
,
butil
::
memory_order_release
,
butil
::
memory_order_relaxed
))
{
butil
::
memory_order_relaxed
))
{
SharedPart
*
sp
=
GetSharedPart
();
if
(
sp
)
{
sp
->
circuit_breaker
.
Reset
();
sp
->
recent_error_count
.
store
(
0
,
butil
::
memory_order_relaxed
);
}
// Set this flag to true since we add additional ref again
// Set this flag to true since we add additional ref again
_recycle_flag
.
store
(
false
,
butil
::
memory_order_relaxed
);
_recycle_flag
.
store
(
false
,
butil
::
memory_order_relaxed
);
if
(
_user
)
{
if
(
_user
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment