Commit 1578278f authored by gejun's avatar gejun

Redesign StreamCreator to solve existing issues

parent 739172ae
......@@ -549,7 +549,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
&& info.id != _correlation_id && info.id != current_id()) {
// The call before backup request was failed.
if (_unfinished_call && get_id(_unfinished_call->nretry) == info.id) {
_unfinished_call->OnComplete(this, _error_code, info.responded);
_unfinished_call->OnComplete(this, _error_code, info.responded, false);
delete _unfinished_call;
_unfinished_call = NULL;
}
......@@ -612,7 +612,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
}
_accessed->Add(_current_call.peer_id);
}
_current_call.OnComplete(this, _error_code, info.responded);
_current_call.OnComplete(this, _error_code, info.responded, false);
++_current_call.nretry;
// Clear http responses before retrying, otherwise the response may
// be mixed with older (and undefined) stuff. This is actually not
......@@ -694,8 +694,8 @@ inline bool does_error_affect_main_socket(int error_code) {
// retries and backup requests. This method simply cares about the error of
// this very Call (specified by |error_code|) rather than the error of the
// entire RPC (specified by c->FailedInline()).
void Controller::Call::OnCompleteAndKeepSocket(
Controller* c, int error_code/*note*/, bool responded) {
void Controller::Call::OnComplete(
Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) {
switch (c->connection_type()) {
case CONNECTION_TYPE_UNKNOWN:
break;
......@@ -758,29 +758,24 @@ void Controller::Call::OnCompleteAndKeepSocket(
sock->SetLogOff();
}
}
if (touched_by_stream_creator) {
touched_by_stream_creator = false;
CHECK(c->stream_creator());
c->stream_creator()->CleanupSocketForStream(
sending_sock.get(), c, error_code);
}
if (enable_circuit_breaker && sending_sock) {
sending_sock->FeedbackCircuitBreaker(error_code,
butil::gettimeofday_us() - begin_time_us);
}
// Release the `Socket' we used to send/receive data
sending_sock.reset(NULL);
if (need_feedback) {
const LoadBalancer::CallInfo info =
{ begin_time_us, peer_id, error_code, c };
c->_lb->Feedback(info);
}
}
void Controller::Call::OnComplete(
Controller* c, int error_code, bool responded) {
OnCompleteAndKeepSocket(c, error_code, responded);
if (touched_by_stream_creator) {
touched_by_stream_creator = false;
CHECK(c->stream_creator());
c->stream_creator()->OnDestroyingStream(
sending_sock, c, error_code, end_of_rpc);
}
// Release the `Socket' we used to send/receive data
sending_sock.reset(NULL);
}
......@@ -799,12 +794,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
}
// TODO: Replace this with stream_creator.
HandleStreamConnection(_current_call.sending_sock.get());
_current_call.OnCompleteAndKeepSocket(this, _error_code, info.responded);
if (_stream_creator) {
_stream_creator->OnStreamCreationDone(
_current_call.sending_sock, this);
}
_current_call.sending_sock.reset(NULL);
_current_call.OnComplete(this, _error_code, info.responded, true);
if (_unfinished_call != NULL) {
// When _current_call is successful, mark _unfinished_call as
......@@ -815,7 +805,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
// same error. This is not accurate as well, but we have to end
// _unfinished_call with some sort of error anyway.
const int err = (_error_code == 0 ? EBACKUPREQUEST : _error_code);
_unfinished_call->OnComplete(this, err, false);
_unfinished_call->OnComplete(this, err, false, true);
delete _unfinished_call;
_unfinished_call = NULL;
}
......@@ -824,7 +814,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
// (which gets punished in LALB) for _current_call because _current_call
// is sent after _unfinished_call, it's just normal that _current_call
// does not respond before _unfinished_call.
_current_call.OnComplete(this, ECANCELED, false);
_current_call.OnComplete(this, ECANCELED, false, true);
if (_unfinished_call != NULL) {
if (_unfinished_call->sending_sock != NULL) {
_remote_side = _unfinished_call->sending_sock->remote_side();
......@@ -833,17 +823,12 @@ void Controller::EndRPC(const CompletionInfo& info) {
// TODO: Replace this with stream_creator.
HandleStreamConnection(_unfinished_call->sending_sock.get());
if (get_id(_unfinished_call->nretry) == info.id) {
_unfinished_call->OnCompleteAndKeepSocket(
this, _error_code, info.responded);
_unfinished_call->OnComplete(
this, _error_code, info.responded, true);
} else {
CHECK(false) << "A previous non-backed-up call responded";
_unfinished_call->OnCompleteAndKeepSocket(this, ECANCELED, false);
}
if (_stream_creator) {
_stream_creator->OnStreamCreationDone(
_unfinished_call->sending_sock, this);
_unfinished_call->OnComplete(this, ECANCELED, false, true);
}
_unfinished_call->sending_sock.reset(NULL);
delete _unfinished_call;
_unfinished_call = NULL;
......@@ -1016,7 +1001,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
}
if (_stream_creator) {
_current_call.touched_by_stream_creator = true;
_stream_creator->ReplaceSocketForStream(&tmp_sock, this);
_stream_creator->OnCreatingStream(&tmp_sock, this);
if (FailedInline()) {
return HandleSendFailed();
}
......
......@@ -249,7 +249,7 @@ public:
const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; }
// Attach a StreamCreator to this RPC. Notice that controller never deletes
// the StreamCreator, you can do the deletion inside OnStreamCreationDone.
// the StreamCreator, you can do the deletion inside OnDestroyingStream.
void set_stream_creator(StreamCreator* sc) { _stream_creator = sc; }
StreamCreator* stream_creator() const { return _stream_creator; }
......@@ -568,8 +568,7 @@ private:
Call(Call*); //move semantics
~Call();
void Reset();
void OnComplete(Controller* c, int error_code, bool responded);
void OnCompleteAndKeepSocket(Controller* c, int error_code, bool responded);
void OnComplete(Controller* c, int error_code, bool responded, bool end_of_rpc);
int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback.
......
......@@ -1320,11 +1320,14 @@ void H2UnsentRequest::Destroy() {
free(this);
}
void H2UnsentRequest::ReplaceSocketForStream(SocketUniquePtr*, Controller*) {
void H2UnsentRequest::OnCreatingStream(SocketUniquePtr*, Controller*) {
}
void H2UnsentRequest::OnStreamCreationDone(
SocketUniquePtr& sending_sock, Controller* cntl) {
void H2UnsentRequest::OnDestroyingStream(
SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) {
if (!end_of_rpc) {
return;
}
if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(_cntl, cntl);
_mutex.lock();
......@@ -1339,10 +1342,6 @@ void H2UnsentRequest::OnStreamCreationDone(
RemoveRefManually();
}
void H2UnsentRequest::CleanupSocketForStream(
Socket* prev_sock, Controller* cntl, int error_code) {
}
struct RemoveRefOnQuit {
RemoveRefOnQuit(H2UnsentRequest* msg) : _msg(msg) {}
~RemoveRefOnQuit() { _msg->RemoveRefManually(); }
......@@ -1660,7 +1659,7 @@ void PackH2Request(butil::IOBuf*,
}
}
void H2GlobalStreamCreator::ReplaceSocketForStream(
void H2GlobalStreamCreator::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) {
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
......@@ -1701,19 +1700,14 @@ void H2GlobalStreamCreator::ReplaceSocketForStream(
if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference();
}
return;
}
void H2GlobalStreamCreator::OnStreamCreationDone(
SocketUniquePtr& sending_sock, Controller* cntl) {
void H2GlobalStreamCreator::OnDestroyingStream(
SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) {
// If any error happens during the time of sending rpc, this function
// would be called. Currently just do nothing.
}
void H2GlobalStreamCreator::CleanupSocketForStream(
Socket* prev_sock, Controller* cntl, int error_code) {
}
StreamCreator* get_h2_global_stream_creator() {
return butil::get_leaky_singleton<H2GlobalStreamCreator>();
}
......
......@@ -90,10 +90,8 @@ public:
}
// @StreamCreator
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller* cntl,
int error_code);
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) override;
// @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
......@@ -218,10 +216,8 @@ void PackH2Request(butil::IOBuf* buf,
class H2GlobalStreamCreator : public StreamCreator {
protected:
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller* cntl,
int error_code);
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) override;
private:
butil::Mutex _mutex;
};
......
......@@ -3524,8 +3524,8 @@ void OnServerStreamCreated::Run(bool error,
break;
}
_stream->_message_stream_id = stream_id;
// client stream needs to be added here rather than OnStreamCreationDone
// to avoid the race between OnStreamCreationDone and a failed OnStatus,
// client stream needs to be added here rather than OnDestroyingStream
// to avoid the race between OnDestroyingStream and a failed OnStatus,
// because the former function runs in another bthread and may run later
// than OnStatus which needs to see the stream.
if (!ctx->AddClientStream(_stream.get())) {
......
......@@ -1678,17 +1678,7 @@ void RtmpClientStream::SignalError() {
}
}
void RtmpClientStream::CleanupSocketForStream(
Socket* prev_sock, Controller*, int /*error_code*/) {
if (prev_sock) {
if (_from_socketmap) {
_client_impl->socket_map().Remove(SocketMapKey(prev_sock->remote_side()),
prev_sock->id());
}
}
}
void RtmpClientStream::ReplaceSocketForStream(
void RtmpClientStream::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) {
{
std::unique_lock<butil::Mutex> mu(_state_mutex);
......@@ -1755,13 +1745,26 @@ void RtmpClientStream::OnFailedToCreateStream() {
return OnStopInternal();
}
void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
Controller* cntl) {
void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int /*error_code*/,
bool end_of_rpc) {
if (!end_of_rpc) {
if (sending_sock) {
if (_from_socketmap) {
_client_impl->socket_map().Remove(SocketMapKey(sending_sock->remote_side()),
sending_sock->id());
} else {
sending_sock->SetFailed(); // not necessary, already failed.
}
}
return;
}
// Always move sending_sock into _rtmpsock.
// - If the RPC is successful, moving sending_sock prevents it from
// setfailed in Controller after calling this method.
// - If the RPC is failed, OnStopInternal() can clean up the socket_map
// inserted in ReplaceSocketForStream().
// inserted in OnCreatingStream().
_rtmpsock.swap(sending_sock);
if (cntl->Failed()) {
......
......@@ -821,9 +821,8 @@ friend class RtmpRetryingClientStream;
int Publish(const butil::StringPiece& name, RtmpPublishType type);
// @StreamCreator
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller*, int error_code);
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr&, Controller*, int error_code, bool end_of_rpc) override;
void OnFailedToCreateStream();
......
......@@ -28,36 +28,33 @@ class Controller;
// generally this object is created before RPC and destroyed after RPC.
class StreamCreator {
public:
// Replace the socket in `inout' with another one (or keep as it is).
// remote_side() of the replaced socket must be same with *inout.
// Called each time before iteracting with a server. Notice that
// if the RPC has retries, this function is called before each retry.
// `cntl' contains necessary information about the RPC, if there's
// any error during replacement, call cntl->SetFailed().
// The replaced socket should take cntl->connection_type() into account
// since the framework will send request by the replaced socket directly
// when stream_creator is present.
virtual void ReplaceSocketForStream(SocketUniquePtr* inout,
Controller* cntl) = 0;
// `cntl' contains necessary information about the call. `sending_sock'
// is the socket to the server interacted with. If the RPC was failed,
// `sending_sock' is prossibly NULL(fail before choosing a server). User
// can own `sending_sock' and set the unique pointer to NULL, otherwise
// the socket is cleaned-up by framework.
virtual void OnStreamCreationDone(SocketUniquePtr& sending_sock,
Controller* cntl) = 0;
// Called when one interation with the server completes. A RPC for
// creating a stream may interact with servers more than once.
// This method is paired with _each_ ReplaceSocketForStream().
// OnStreamCreationDone() is called _after_ last CleanupSocketForStream(),
// If OnStreamCreationDone() moved the `sending_sock', `prev_sock' to this
// method is NULL.
// Use `error_code' instead of cntl->ErrorCode().
virtual void CleanupSocketForStream(Socket* prev_sock,
Controller* cntl,
int error_code) = 0;
// Called when the socket for sending request is about to be created.
// If the RPC has retries, this function is called before each retry.
// Params:
// inout: pointing to the socket to send requests by default,
// replaceable by user created ones (or keep as it is). remote_side()
// of the replaced socket must be same with the default socket.
// The replaced socket should take cntl->connection_type() into account
// since the framework sends request by the replaced socket directly
// when stream_creator is present.
// cntl: contains contexts of the RPC, if there's any error during
// replacement, call cntl->SetFailed().
virtual void OnCreatingStream(SocketUniquePtr* inout,
Controller* cntl) = 0;
// Called when the stream is about to destroyed.
// If the RPC has retries, this function is called before each retry.
// This method is always called even if OnCreatingStream() is not called.
// Params:
// sending_sock: The socket chosen by OnCreatingStream(), if OnCreatingStream
// is not called, the enclosed socket may be NULL.
// cntl: contexts of the RPC
// error_code: Use this instead of cntl->ErrorCode()
// end_of_rpc: true if the RPC is about to destroyed.
virtual void OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc) = 0;
};
} // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment