Commit ead9f8bc authored by TousakaRin's avatar TousakaRin

Adjust the indicator being exposed

parent 88584757
...@@ -122,6 +122,8 @@ void ConnectionsService::PrintConnections( ...@@ -122,6 +122,8 @@ void ConnectionsService::PrintConnections(
os << "<th>SSL</th>" os << "<th>SSL</th>"
"<th>Protocol</th>" "<th>Protocol</th>"
"<th>fd</th>" "<th>fd</th>"
"<th>recent_err</th>"
"<th>nbreak</th>"
"<th>InBytes/s</th>" "<th>InBytes/s</th>"
"<th>In/s</th>" "<th>In/s</th>"
"<th>InBytes/m</th>" "<th>InBytes/m</th>"
...@@ -138,7 +140,7 @@ void ConnectionsService::PrintConnections( ...@@ -138,7 +140,7 @@ void ConnectionsService::PrintConnections(
if (need_local) { if (need_local) {
os << "Local|"; os << "Local|";
} }
os << "SSL|Protocol |fd |" os << "SSL|Protocol |fd |recent_err|nbreak|"
"InBytes/s|In/s |InBytes/m |In/m |" "InBytes/s|In/s |InBytes/m |In/m |"
"OutBytes/s|Out/s |OutBytes/m|Out/m |" "OutBytes/s|Out/s |OutBytes/m|Out/m |"
"Rtt/Var(ms)|SocketId\n"; "Rtt/Var(ms)|SocketId\n";
...@@ -174,9 +176,12 @@ void ConnectionsService::PrintConnections( ...@@ -174,9 +176,12 @@ void ConnectionsService::PrintConnections(
if (need_local) { if (need_local) {
os << min_width(ptr->local_side().port, 5) << bar; os << min_width(ptr->local_side().port, 5) << bar;
} }
os << min_width("-", 3) << bar os << min_width("-", 3) << bar
<< min_width("-", 12) << bar << min_width("-", 12) << bar
<< min_width("-", 5) << bar << min_width("-", 5) << bar
<< min_width(ptr->recent_error_count(), 11) << bar
<< min_width(ptr->isolated_times(), 7) << bar
<< min_width("-", 9) << bar << min_width("-", 9) << bar
<< min_width("-", 6) << bar << min_width("-", 6) << bar
<< min_width("-", 10) << bar << min_width("-", 10) << bar
...@@ -288,7 +293,9 @@ void ConnectionsService::PrintConnections( ...@@ -288,7 +293,9 @@ void ConnectionsService::PrintConnections(
} else { } else {
os << min_width("-", 5) << bar; os << min_width("-", 5) << bar;
} }
os << min_width(stat.in_size_s, 9) << bar os << min_width(ptr->recent_error_count(), 11) << bar
<< min_width(ptr->isolated_times(), 7) << bar
<< min_width(stat.in_size_s, 9) << bar
<< min_width(stat.in_num_messages_s, 6) << bar << min_width(stat.in_num_messages_s, 6) << bar
<< min_width(stat.in_size_m, 10) << bar << min_width(stat.in_size_m, 10) << bar
<< min_width(stat.in_num_messages_m, 8) << bar << min_width(stat.in_num_messages_m, 8) << bar
......
...@@ -89,20 +89,6 @@ void CircuitBreaker::EmaErrorRecorder::Reset() { ...@@ -89,20 +89,6 @@ void CircuitBreaker::EmaErrorRecorder::Reset() {
_ema_latency.store(0, butil::memory_order_relaxed); _ema_latency.store(0, butil::memory_order_relaxed);
} }
int64_t CircuitBreaker::EmaErrorRecorder::max_error_cost() const {
const int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
return ema_latency * _window_size * (_max_error_percent / 100.0) * (1.0 + EPSILON);
}
int CircuitBreaker::EmaErrorRecorder::health_score() const {
const int64_t current_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
const int64_t error_cost_threshold = max_error_cost();
if (error_cost_threshold == 0) {
return current_error_cost == 0 ? 100 : 0;
}
return 100 - std::min<int>(100 * current_error_cost / error_cost_threshold, 100);
}
int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) { int64_t CircuitBreaker::EmaErrorRecorder::UpdateLatency(int64_t latency) {
int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed); int64_t ema_latency = _ema_latency.load(butil::memory_order_relaxed);
do { do {
...@@ -129,7 +115,9 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost, ...@@ -129,7 +115,9 @@ bool CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
int64_t ema_error_cost = int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed); _ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
ema_error_cost += error_cost; ema_error_cost += error_cost;
return ema_error_cost <= max_error_cost(); const int64_t max_error_cost =
ema_latency * _window_size * (_max_error_percent / 100.0) * (1.0 + EPSILON);
return ema_error_cost <= max_error_cost;
} }
//Ordinary response //Ordinary response
...@@ -190,10 +178,6 @@ void CircuitBreaker::MarkAsBroken() { ...@@ -190,10 +178,6 @@ void CircuitBreaker::MarkAsBroken() {
} }
} }
int CircuitBreaker::health_score() const {
return std::min(_long_window.health_score(), _short_window.health_score());
}
void CircuitBreaker::UpdateIsolationDuration() { void CircuitBreaker::UpdateIsolationDuration() {
int64_t now_time_ms = butil::cpuwide_time_ms(); int64_t now_time_ms = butil::cpuwide_time_ms();
int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed); int isolation_duration_ms = _isolation_duration_ms.load(butil::memory_order_relaxed);
......
...@@ -46,10 +46,6 @@ public: ...@@ -46,10 +46,6 @@ public:
// only the first call will take effect. // only the first call will take effect.
void MarkAsBroken(); void MarkAsBroken();
// The closer to 100, the less recent errors occurred, and 0 means that
// it should be isolated.
int health_score() const;
// Number of times marked as broken // Number of times marked as broken
int isolated_times() const { int isolated_times() const {
return _isolated_times.load(butil::memory_order_relaxed); return _isolated_times.load(butil::memory_order_relaxed);
...@@ -70,7 +66,6 @@ private: ...@@ -70,7 +66,6 @@ private:
bool OnCallEnd(int error_code, int64_t latency); bool OnCallEnd(int error_code, int64_t latency);
void Reset(); void Reset();
int64_t max_error_cost() const;
int health_score() const; int health_score() const;
private: private:
......
...@@ -701,10 +701,9 @@ void Controller::Call::OnComplete( ...@@ -701,10 +701,9 @@ void Controller::Call::OnComplete(
stream_user_data = NULL; stream_user_data = NULL;
} }
if (sending_sock) { if (sending_sock != NULL) {
sending_sock->AddRequestCount();
if (error_code != 0) { if (error_code != 0) {
sending_sock->AddErrorCount(); sending_sock->AddRecentError();
} }
if (enable_circuit_breaker) { if (enable_circuit_breaker) {
......
...@@ -178,8 +178,7 @@ public: ...@@ -178,8 +178,7 @@ public:
CircuitBreaker circuit_breaker; CircuitBreaker circuit_breaker;
butil::atomic<uint64_t> acc_errors; butil::atomic<uint64_t> recent_error_count;
butil::atomic<uint64_t> acc_requests;
explicit SharedPart(SocketId creator_socket_id); explicit SharedPart(SocketId creator_socket_id);
~SharedPart(); ~SharedPart();
...@@ -197,8 +196,7 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2) ...@@ -197,8 +196,7 @@ Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
, out_size(0) , out_size(0)
, out_num_messages(0) , out_num_messages(0)
, extended_stat(NULL) , extended_stat(NULL)
, acc_errors(0) , recent_error_count(0) {
, acc_requests(0) {
} }
Socket::SharedPart::~SharedPart() { Socket::SharedPart::~SharedPart() {
...@@ -771,6 +769,7 @@ void Socket::Revive() { ...@@ -771,6 +769,7 @@ void Socket::Revive() {
SharedPart* sp = GetSharedPart(); SharedPart* sp = GetSharedPart();
if (sp) { if (sp) {
sp->circuit_breaker.Reset(); sp->circuit_breaker.Reset();
sp->recent_error_count.store(0, butil::memory_order_relaxed);
} }
// Set this flag to true since we add additional ref again // Set this flag to true since we add additional ref again
_recycle_flag.store(false, butil::memory_order_relaxed); _recycle_flag.store(false, butil::memory_order_relaxed);
...@@ -807,18 +806,27 @@ int Socket::ReleaseAdditionalReference() { ...@@ -807,18 +806,27 @@ int Socket::ReleaseAdditionalReference() {
return -1; return -1;
} }
void Socket::AddErrorCount() { void Socket::AddRecentError() {
SharedPart* sp = GetSharedPart(); SharedPart* sp = GetSharedPart();
if (sp) { if (sp) {
sp->acc_errors.fetch_add(1, butil::memory_order_relaxed); sp->recent_error_count.fetch_add(1, butil::memory_order_relaxed);
} }
} }
void Socket::AddRequestCount() { int64_t Socket::recent_error_count() const {
SharedPart* sp = GetSharedPart(); SharedPart* sp = GetSharedPart();
if (sp) { if (sp) {
sp->acc_requests.fetch_add(1, butil::memory_order_relaxed); return sp->recent_error_count.load(butil::memory_order_relaxed);
} }
return 0;
}
int Socket::isolated_times() const {
SharedPart* sp = GetSharedPart();
if (sp) {
return sp->circuit_breaker.isolated_times();
}
return 0;
} }
int Socket::SetFailed(int error_code, const char* error_fmt, ...) { int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
...@@ -2139,10 +2147,6 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ...@@ -2139,10 +2147,6 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\n in_num_messages=" << sp->in_num_messages.load(butil::memory_order_relaxed) << "\n in_num_messages=" << sp->in_num_messages.load(butil::memory_order_relaxed)
<< "\n out_size=" << sp->out_size.load(butil::memory_order_relaxed) << "\n out_size=" << sp->out_size.load(butil::memory_order_relaxed)
<< "\n out_num_messages=" << sp->out_num_messages.load(butil::memory_order_relaxed) << "\n out_num_messages=" << sp->out_num_messages.load(butil::memory_order_relaxed)
<< "\n health_score=" << sp->circuit_breaker.health_score()
<< "\n isolated_times=" << sp->circuit_breaker.isolated_times()
<< "\n acc_requests=" << sp->acc_requests.load(butil::memory_order_relaxed)
<< "\n acc_errors=" << sp->acc_errors.load(butil::memory_order_relaxed)
<< "\n}"; << "\n}";
} }
const int fd = ptr->_fd.load(butil::memory_order_relaxed); const int fd = ptr->_fd.load(butil::memory_order_relaxed);
......
...@@ -317,9 +317,11 @@ public: ...@@ -317,9 +317,11 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4))); __attribute__ ((__format__ (__printf__, 3, 4)));
static int SetFailed(SocketId id); static int SetFailed(SocketId id);
void AddErrorCount(); void AddRecentError();
void AddRequestCount(); int64_t recent_error_count() const;
int isolated_times() const;
void FeedbackCircuitBreaker(int error_code, int64_t latency_us); void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment