Commit b7e545c5 authored by zhujiashun's avatar zhujiashun Committed by gejun

reverse the order of OnStreamCreationDone and CleanupSocketForStream

parent 7a0d45a3
...@@ -695,7 +695,7 @@ inline bool does_error_affect_main_socket(int error_code) { ...@@ -695,7 +695,7 @@ inline bool does_error_affect_main_socket(int error_code) {
// this very Call (specified by |error_code|) rather than the error of the // this very Call (specified by |error_code|) rather than the error of the
// entire RPC (specified by c->FailedInline()). // entire RPC (specified by c->FailedInline()).
void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
bool responded) { bool responded, bool release_socket) {
switch (c->connection_type()) { switch (c->connection_type()) {
case CONNECTION_TYPE_UNKNOWN: case CONNECTION_TYPE_UNKNOWN:
break; break;
...@@ -778,6 +778,11 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, ...@@ -778,6 +778,11 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
} }
} }
void Controller::Call::OnCompleteAndKeepSocket(
Controller* c, int error_code, bool responded) {
OnComplete(c, error_code, responded, false);
}
void Controller::EndRPC(const CompletionInfo& info) { void Controller::EndRPC(const CompletionInfo& info) {
if (_timeout_id != 0) { if (_timeout_id != 0) {
bthread_timer_del(_timeout_id); bthread_timer_del(_timeout_id);
...@@ -792,11 +797,12 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -792,11 +797,12 @@ void Controller::EndRPC(const CompletionInfo& info) {
} }
// TODO: Replace this with stream_creator. // TODO: Replace this with stream_creator.
HandleStreamConnection(_current_call.sending_sock.get()); HandleStreamConnection(_current_call.sending_sock.get());
_current_call.OnCompleteAndKeepSocket(this, _error_code, info.responded);
if (_stream_creator) { if (_stream_creator) {
_stream_creator->OnStreamCreationDone( _stream_creator->OnStreamCreationDone(
_current_call.sending_sock, this); _current_call.sending_sock, this);
} }
_current_call.OnComplete(this, _error_code, info.responded); _current_call.sending_sock.reset(NULL);
if (_unfinished_call != NULL) { if (_unfinished_call != NULL) {
// When _current_call is successful, mark _unfinished_call as // When _current_call is successful, mark _unfinished_call as
...@@ -824,18 +830,20 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -824,18 +830,20 @@ void Controller::EndRPC(const CompletionInfo& info) {
} }
// TODO: Replace this with stream_creator. // TODO: Replace this with stream_creator.
HandleStreamConnection(_unfinished_call->sending_sock.get()); HandleStreamConnection(_unfinished_call->sending_sock.get());
if (_stream_creator) {
_stream_creator->OnStreamCreationDone(
_unfinished_call->sending_sock, this);
}
if (get_id(_unfinished_call->nretry) == info.id) { if (get_id(_unfinished_call->nretry) == info.id) {
_unfinished_call->OnComplete(this, _error_code, info.responded); _unfinished_call->OnCompleteAndKeepSocket(
this, _error_code, info.responded);
} else { } else {
CHECK(false) << "A previous non-backed-up call responded"; CHECK(false) << "A previous non-backed-up call responded";
_unfinished_call->OnComplete(this, ECANCELED, false); _unfinished_call->OnCompleteAndKeepSocket(this, ECANCELED, false);
} }
delete _unfinished_call; delete _unfinished_call;
_unfinished_call = NULL; _unfinished_call = NULL;
if (_stream_creator) {
_stream_creator->OnStreamCreationDone(
_unfinished_call->sending_sock, this);
}
_unfinished_call->sending_sock.reset(NULL);
} else { } else {
CHECK(false) << "A previous non-backed-up call responded"; CHECK(false) << "A previous non-backed-up call responded";
} }
......
...@@ -568,7 +568,8 @@ private: ...@@ -568,7 +568,8 @@ private:
Call(Call*); //move semantics Call(Call*); //move semantics
~Call(); ~Call();
void Reset(); void Reset();
void OnComplete(Controller* c, int error_code, bool responded); void OnComplete(Controller* c, int error_code, bool responded, bool release_socket = true);
void OnCompleteAndKeepSocket(Controller* c, int error_code, bool responded);
int nretry; // sent in nretry-th retry. int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback. bool need_feedback; // The LB needs feedback.
......
...@@ -1684,8 +1684,6 @@ void RtmpClientStream::CleanupSocketForStream( ...@@ -1684,8 +1684,6 @@ void RtmpClientStream::CleanupSocketForStream(
if (_from_socketmap) { if (_from_socketmap) {
_client_impl->socket_map().Remove(SocketMapKey(prev_sock->remote_side()), _client_impl->socket_map().Remove(SocketMapKey(prev_sock->remote_side()),
prev_sock->id()); prev_sock->id());
} else {
prev_sock->SetFailed(); // not necessary, already failed.
} }
} }
} }
......
...@@ -44,14 +44,14 @@ public: ...@@ -44,14 +44,14 @@ public:
// is the socket to the server interacted with. If the RPC was failed, // is the socket to the server interacted with. If the RPC was failed,
// `sending_sock' is prossibly NULL(fail before choosing a server). User // `sending_sock' is prossibly NULL(fail before choosing a server). User
// can own `sending_sock' and set the unique pointer to NULL, otherwise // can own `sending_sock' and set the unique pointer to NULL, otherwise
// the socket is cleaned-up by framework and then CleanupSocketForStream() // the socket is cleaned-up by framework.
virtual void OnStreamCreationDone(SocketUniquePtr& sending_sock, virtual void OnStreamCreationDone(SocketUniquePtr& sending_sock,
Controller* cntl) = 0; Controller* cntl) = 0;
// Called when one interation with the server completes. A RPC for // Called when one interation with the server completes. A RPC for
// creating a stream may interact with servers more than once. // creating a stream may interact with servers more than once.
// This method is paired with _each_ ReplaceSocketForStream(). // This method is paired with _each_ ReplaceSocketForStream().
// OnStreamCreationDone() is called _before_ last CleanupSocketForStream(), // OnStreamCreationDone() is called _after_ last CleanupSocketForStream(),
// If OnStreamCreationDone() moved the `sending_sock', `prev_sock' to this // If OnStreamCreationDone() moved the `sending_sock', `prev_sock' to this
// method is NULL. // method is NULL.
// Use `error_code' instead of cntl->ErrorCode(). // Use `error_code' instead of cntl->ErrorCode().
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment