Commit 6cf925c1 authored by gejun's avatar gejun

Fix a bug in Controlle.OnVersionedRPCReturned that successful response of…

Fix a bug in Controlle.OnVersionedRPCReturned that successful response of non-backup request should be ignored
parent 8049aeb5
......@@ -67,6 +67,7 @@ BAIDU_REGISTER_ERRNO(brpc::ERTMPCREATESTREAM, "createStream was rejected by the
BAIDU_REGISTER_ERRNO(brpc::EEOF, "Got EOF");
BAIDU_REGISTER_ERRNO(brpc::EUNUSED, "The socket was not needed");
BAIDU_REGISTER_ERRNO(brpc::ESSL, "SSL related operation failed");
BAIDU_REGISTER_ERRNO(brpc::EH2RUNOUTSTREAMS, "The H2 socket was run out of streams");
BAIDU_REGISTER_ERRNO(brpc::EINTERNAL, "General internal error");
BAIDU_REGISTER_ERRNO(brpc::ERESPONSE, "Bad response");
......@@ -238,7 +239,7 @@ void Controller::InternalReset(bool in_constructor) {
_done = NULL;
_sender = NULL;
_request_code = 0;
_single_server_id = (SocketId)-1;
_single_server_id = INVALID_SOCKET_ID;
_unfinished_call = NULL;
_stream_creator = NULL;
_accessed = NULL;
......@@ -266,7 +267,7 @@ Controller::Call::Call(Controller::Call* rhs)
// setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly.
rhs->need_feedback = false;
rhs->peer_id = (SocketId)-1;
rhs->peer_id = INVALID_SOCKET_ID;
rhs->stream_user_data = NULL;
}
......@@ -278,8 +279,7 @@ void Controller::Call::Reset() {
nretry = 0;
need_feedback = false;
enable_circuit_breaker = false;
touched_by_stream_creator = false;
peer_id = (SocketId)-1;
peer_id = INVALID_SOCKET_ID;
begin_time_us = 0;
sending_sock.reset(NULL);
stream_user_data = NULL;
......@@ -544,20 +544,26 @@ static void HandleTimeout(void* arg) {
void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
bool new_bthread, int saved_error) {
// Intercept errors from previous calls because handling these errors
// is quick and does not need new thread.
if (FailedInline()
&& info.id != _correlation_id && info.id != current_id()) {
// The call before backup request was failed.
// TODO(gejun): Simplify call-ending code.
// Intercept previous calls
while (info.id != _correlation_id && info.id != current_id()) {
if (_unfinished_call && get_id(_unfinished_call->nretry) == info.id) {
if (!FailedInline()) {
// Continue with successful backup request.
break;
}
// Complete failed backup request.
_unfinished_call->OnComplete(this, _error_code, info.responded, false);
delete _unfinished_call;
_unfinished_call = NULL;
}
}
// Ignore all non-backup requests and failed backup requests.
_error_code = saved_error;
response_attachment().clear();
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}
if ((!_error_code && _retry_policy == NULL) ||
_current_call.nretry >= _max_retry) {
goto END_OF_RPC;
......@@ -814,6 +820,13 @@ void Controller::EndRPC(const CompletionInfo& info) {
// (which gets punished in LALB) for _current_call because _current_call
// is sent after _unfinished_call, it's just normal that _current_call
// does not respond before _unfinished_call.
if (_unfinished_call == NULL) {
CHECK(false) << "A previous non-backup request responded, cid="
<< info.id << " current_cid=" << current_id()
<< " initial_cid=" << _correlation_id
<< " stream_user_data=" << _current_call.stream_user_data
<< " sending_sock=" << *_current_call.sending_sock;
}
_current_call.OnComplete(this, ECANCELED, false, false);
if (_unfinished_call != NULL) {
if (_unfinished_call->sending_sock != NULL) {
......@@ -826,14 +839,12 @@ void Controller::EndRPC(const CompletionInfo& info) {
_unfinished_call->OnComplete(
this, _error_code, info.responded, true);
} else {
CHECK(false) << "A previous non-backed-up call responded";
CHECK(false) << "A previous non-backup request responded";
_unfinished_call->OnComplete(this, ECANCELED, false, true);
}
delete _unfinished_call;
_unfinished_call = NULL;
} else {
CHECK(false) << "A previous non-backed-up call responded";
}
}
if (_stream_creator) {
......@@ -1004,7 +1015,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
}
if (_stream_creator) {
_current_call.stream_user_data =
_stream_creator->OnCreatingStream(&tmp_sock, this);
_stream_creator->OnCreatingStream(&tmp_sock, this);
if (FailedInline()) {
return HandleSendFailed();
}
......@@ -1036,9 +1047,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
} else {
int rc = 0;
if (_connection_type == CONNECTION_TYPE_POOLED) {
rc = Socket::GetPooledSocket(tmp_sock.get(), &_current_call.sending_sock);
rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock);
} else if (_connection_type == CONNECTION_TYPE_SHORT) {
rc = Socket::GetShortSocket(tmp_sock.get(), &_current_call.sending_sock);
rc = tmp_sock->GetShortSocket(&_current_call.sending_sock);
} else {
tmp_sock.reset();
SetFailed(EINVAL, "Invalid connection_type=%d", (int)_connection_type);
......
......@@ -552,11 +552,12 @@ private:
CallId id = { _correlation_id.value + nretry + 1 };
return id;
}
public:
CallId current_id() const {
CallId id = { _correlation_id.value + _current_call.nretry + 1 };
return id;
}
private:
// Append server information to `_error_text'
void AppendServerIdentiy();
......@@ -586,7 +587,7 @@ private:
void HandleStreamConnection(Socket *host_socket);
bool SingleServer() const { return _single_server_id != (SocketId)-1; }
bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID; }
void SubmitSpan();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment