Commit dae91f83 authored by helei's avatar helei

enable circuit breaker for backup request

parent ed9d8d26
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
// Zhangyi Chen(chenzhangyi01@baidu.com) // Zhangyi Chen(chenzhangyi01@baidu.com)
#include <signal.h> #include <signal.h>
#include <openssl/md5.h> #include <openssl/md5.h>
#include <google/protobuf/descriptor.h> #include <google/protobuf/descriptor.h>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "bthread/bthread.h" #include "bthread/bthread.h"
...@@ -260,6 +260,7 @@ void Controller::ResetPods() { ...@@ -260,6 +260,7 @@ void Controller::ResetPods() {
Controller::Call::Call(Controller::Call* rhs) Controller::Call::Call(Controller::Call* rhs)
: nretry(rhs->nretry) : nretry(rhs->nretry)
, need_feedback(rhs->need_feedback) , need_feedback(rhs->need_feedback)
, enable_circuit_breaker(rhs->enable_circuit_breaker)
, peer_id(rhs->peer_id) , peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us) , begin_time_us(rhs->begin_time_us)
, sending_sock(rhs->sending_sock.release()) , sending_sock(rhs->sending_sock.release())
...@@ -348,7 +349,7 @@ void Controller::AppendServerIdentiy() { ...@@ -348,7 +349,7 @@ void Controller::AppendServerIdentiy() {
_error_text.reserve(_error_text.size() + MD5_DIGEST_LENGTH * 2 + 2); _error_text.reserve(_error_text.size() + MD5_DIGEST_LENGTH * 2 + 2);
_error_text.push_back('['); _error_text.push_back('[');
char ipbuf[64]; char ipbuf[64];
int len = snprintf(ipbuf, sizeof(ipbuf), "%s:%d", int len = snprintf(ipbuf, sizeof(ipbuf), "%s:%d",
butil::my_ip_cstr(), _server->listen_address().port); butil::my_ip_cstr(), _server->listen_address().port);
unsigned char digest[MD5_DIGEST_LENGTH]; unsigned char digest[MD5_DIGEST_LENGTH];
MD5((const unsigned char*)ipbuf, len, digest); MD5((const unsigned char*)ipbuf, len, digest);
...@@ -507,7 +508,7 @@ void Controller::NotifyOnCancel(google::protobuf::Closure* callback) { ...@@ -507,7 +508,7 @@ void Controller::NotifyOnCancel(google::protobuf::Closure* callback) {
LOG(WARNING) << "Parameter `callback' is NLLL"; LOG(WARNING) << "Parameter `callback' is NLLL";
return; return;
} }
ClosureGuard guard(callback); ClosureGuard guard(callback);
if (_oncancel_id != INVALID_BTHREAD_ID) { if (_oncancel_id != INVALID_BTHREAD_ID) {
LOG(FATAL) << "NotifyCancel a single call more than once!"; LOG(FATAL) << "NotifyCancel a single call more than once!";
...@@ -627,7 +628,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, ...@@ -627,7 +628,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
response_attachment().clear(); response_attachment().clear();
return IssueRPC(butil::gettimeofday_us()); return IssueRPC(butil::gettimeofday_us());
} }
END_OF_RPC: END_OF_RPC:
if (new_bthread) { if (new_bthread) {
// [ Essential for -usercode_in_pthread=true ] // [ Essential for -usercode_in_pthread=true ]
...@@ -710,7 +711,7 @@ void Controller::Call::OnComplete( ...@@ -710,7 +711,7 @@ void Controller::Call::OnComplete(
} }
if (enable_circuit_breaker) { if (enable_circuit_breaker) {
sending_sock->FeedbackCircuitBreaker(error_code, sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us); butil::gettimeofday_us() - begin_time_us);
} }
} }
...@@ -877,7 +878,7 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -877,7 +878,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
const CallId saved_cid = _correlation_id; const CallId saved_cid = _correlation_id;
if (_done) { if (_done) {
if (!FLAGS_usercode_in_pthread || _done == DoNothing()/*Note*/) { if (!FLAGS_usercode_in_pthread || _done == DoNothing()/*Note*/) {
// Note: no need to run DoNothing in backup thread when pthread // Note: no need to run DoNothing in backup thread when pthread
// mode is on. Otherwise there's a tricky deadlock: // mode is on. Otherwise there's a tricky deadlock:
// void SomeService::CallMethod(...) { // -usercode_in_pthread=true // void SomeService::CallMethod(...) { // -usercode_in_pthread=true
// ... // ...
...@@ -887,7 +888,7 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -887,7 +888,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
// } // }
// Join is not signalled when the done does not Run() and the done // Join is not signalled when the done does not Run() and the done
// can't Run() because all backup threads are blocked by Join(). // can't Run() because all backup threads are blocked by Join().
OnRPCEnd(butil::gettimeofday_us()); OnRPCEnd(butil::gettimeofday_us());
const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE); const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE);
_done->Run(); _done->Run();
...@@ -1292,7 +1293,7 @@ void Controller::HandleStreamConnection(Socket *host_socket) { ...@@ -1292,7 +1293,7 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
if (_request_stream == INVALID_STREAM_ID) { if (_request_stream == INVALID_STREAM_ID) {
CHECK(!has_remote_stream()); CHECK(!has_remote_stream());
return; return;
} }
SocketUniquePtr ptr; SocketUniquePtr ptr;
if (!FailedInline()) { if (!FailedInline()) {
if (Socket::Address(_request_stream, &ptr) != 0) { if (Socket::Address(_request_stream, &ptr) != 0) {
...@@ -1309,7 +1310,7 @@ void Controller::HandleStreamConnection(Socket *host_socket) { ...@@ -1309,7 +1310,7 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
if (FailedInline()) { if (FailedInline()) {
Stream::SetFailed(_request_stream); Stream::SetFailed(_request_stream);
if (_remote_stream_settings != NULL) { if (_remote_stream_settings != NULL) {
policy::SendStreamRst(host_socket, policy::SendStreamRst(host_socket,
_remote_stream_settings->stream_id()); _remote_stream_settings->stream_id());
} }
return; return;
...@@ -1363,7 +1364,7 @@ Controller::CreateProgressiveAttachment(StopStyle stop_style) { ...@@ -1363,7 +1364,7 @@ Controller::CreateProgressiveAttachment(StopStyle stop_style) {
} }
SocketUniquePtr httpsock; SocketUniquePtr httpsock;
_current_call.sending_sock->ReAddress(&httpsock); _current_call.sending_sock->ReAddress(&httpsock);
if (stop_style == FORCE_STOP) { if (stop_style == FORCE_STOP) {
httpsock->fail_me_at_server_stop(); httpsock->fail_me_at_server_stop();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment