Commit 10e701b8 authored by zhujiashun's avatar zhujiashun Committed by gejun

Split StreamCreator into StreamUserData and StreamCreator to make

the responsibility of stream_data and stream_creator more clearer
parent f94fe364
...@@ -771,13 +771,15 @@ void Controller::Call::OnComplete( ...@@ -771,13 +771,15 @@ void Controller::Call::OnComplete(
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (c->stream_creator()) { if (stream_user_data) {
c->stream_creator()->OnDestroyingStream( stream_user_data->DestroyStreamUserData(sending_sock, c, error_code, end_of_rpc);
sending_sock, c, error_code, end_of_rpc, stream_user_data); stream_user_data = NULL;
}
if (end_of_rpc && c->stream_creator()) {
c->stream_creator()->DestroyStreamCreator(c);
} }
// Release the `Socket' we used to send/receive data // Release the `Socket' we used to send/receive data
sending_sock.reset(NULL); sending_sock.reset(NULL);
stream_user_data = NULL;
} }
void Controller::EndRPC(const CompletionInfo& info) { void Controller::EndRPC(const CompletionInfo& info) {
...@@ -792,9 +794,6 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -792,9 +794,6 @@ void Controller::EndRPC(const CompletionInfo& info) {
_remote_side = _current_call.sending_sock->remote_side(); _remote_side = _current_call.sending_sock->remote_side();
_local_side = _current_call.sending_sock->local_side(); _local_side = _current_call.sending_sock->local_side();
} }
// TODO: Replace this with stream_creator.
HandleStreamConnection(_current_call.sending_sock.get());
_current_call.OnComplete(this, _error_code, info.responded, true);
if (_unfinished_call != NULL) { if (_unfinished_call != NULL) {
// When _current_call is successful, mark _unfinished_call as // When _current_call is successful, mark _unfinished_call as
...@@ -805,16 +804,19 @@ void Controller::EndRPC(const CompletionInfo& info) { ...@@ -805,16 +804,19 @@ void Controller::EndRPC(const CompletionInfo& info) {
// same error. This is not accurate as well, but we have to end // same error. This is not accurate as well, but we have to end
// _unfinished_call with some sort of error anyway. // _unfinished_call with some sort of error anyway.
const int err = (_error_code == 0 ? EBACKUPREQUEST : _error_code); const int err = (_error_code == 0 ? EBACKUPREQUEST : _error_code);
_unfinished_call->OnComplete(this, err, false, true); _unfinished_call->OnComplete(this, err, false, false);
delete _unfinished_call; delete _unfinished_call;
_unfinished_call = NULL; _unfinished_call = NULL;
} }
// TODO: Replace this with stream_creator.
HandleStreamConnection(_current_call.sending_sock.get());
_current_call.OnComplete(this, _error_code, info.responded, true);
} else { } else {
// Even if _unfinished_call succeeded, we don't use EBACKUPREQUEST // Even if _unfinished_call succeeded, we don't use EBACKUPREQUEST
// (which gets punished in LALB) for _current_call because _current_call // (which gets punished in LALB) for _current_call because _current_call
// is sent after _unfinished_call, it's just normal that _current_call // is sent after _unfinished_call, it's just normal that _current_call
// does not respond before _unfinished_call. // does not respond before _unfinished_call.
_current_call.OnComplete(this, ECANCELED, false, true); _current_call.OnComplete(this, ECANCELED, false, false);
if (_unfinished_call != NULL) { if (_unfinished_call != NULL) {
if (_unfinished_call->sending_sock != NULL) { if (_unfinished_call->sending_sock != NULL) {
_remote_side = _unfinished_call->sending_sock->remote_side(); _remote_side = _unfinished_call->sending_sock->remote_side();
......
...@@ -581,7 +581,7 @@ private: ...@@ -581,7 +581,7 @@ private:
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
// socket fetched from socket pool // socket fetched from socket pool
SocketUniquePtr sending_sock; SocketUniquePtr sending_sock;
void* stream_user_data; StreamUserData* stream_user_data;
}; };
void HandleStreamConnection(Socket *host_socket); void HandleStreamConnection(Socket *host_socket);
......
...@@ -67,7 +67,7 @@ public: ...@@ -67,7 +67,7 @@ public:
_cntl->_current_call.sending_sock.reset(ptr.release()); _cntl->_current_call.sending_sock.reset(ptr.release());
} }
void* get_stream_user_data() { StreamUserData* get_stream_user_data() {
return _cntl->_current_call.stream_user_data; return _cntl->_current_call.stream_user_data;
} }
......
...@@ -1330,7 +1330,10 @@ private: ...@@ -1330,7 +1330,10 @@ private:
H2UnsentRequest* _msg; H2UnsentRequest* _msg;
}; };
void H2UnsentRequest::Discard(SocketUniquePtr& sending_sock, Controller* cntl) { void H2UnsentRequest::DestroyStreamUserData(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc) {
RemoveRefOnQuit deref_self(this); RemoveRefOnQuit deref_self(this);
if (sending_sock != NULL && cntl->ErrorCode() != 0) { if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(cntl, _cntl); CHECK_EQ(cntl, _cntl);
...@@ -1341,7 +1344,6 @@ void H2UnsentRequest::Discard(SocketUniquePtr& sending_sock, Controller* cntl) { ...@@ -1341,7 +1344,6 @@ void H2UnsentRequest::Discard(SocketUniquePtr& sending_sock, Controller* cntl) {
ctx->AddAbandonedStream(_stream_id); ctx->AddAbandonedStream(_stream_id);
} }
} }
RemoveRefManually();
} }
// bvar::Adder<int64_t> g_append_request_time; // bvar::Adder<int64_t> g_append_request_time;
...@@ -1635,7 +1637,8 @@ void PackH2Request(butil::IOBuf*, ...@@ -1635,7 +1637,8 @@ void PackH2Request(butil::IOBuf*,
header->SetHeader("Authorization", auth_data); header->SetHeader("Authorization", auth_data);
} }
H2UnsentRequest* h2_req = (H2UnsentRequest*)accessor.get_stream_user_data(); H2UnsentRequest* h2_req = dynamic_cast<H2UnsentRequest*>(accessor.get_stream_user_data());
CHECK(h2_req);
h2_req->AddRefManually(); // add ref for AppendAndDestroySelf h2_req->AddRefManually(); // add ref for AppendAndDestroySelf
h2_req->_sctx->set_correlation_id(correlation_id); h2_req->_sctx->set_correlation_id(correlation_id);
*user_message = h2_req; *user_message = h2_req;
...@@ -1647,29 +1650,16 @@ void PackH2Request(butil::IOBuf*, ...@@ -1647,29 +1650,16 @@ void PackH2Request(butil::IOBuf*,
} }
} }
void* H2GlobalStreamCreator::OnCreatingStream( StreamUserData* H2GlobalStreamCreator::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) { SocketUniquePtr* inout, Controller* cntl) {
// Although the critical section looks huge, it should rarely be contended // Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending. // since timeout of RPC is much larger than the delay of sending.
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
bool need_create_agent = true; SocketUniquePtr& agent_sock = (*inout)->_agent_socket;
do { if (!agent_sock || agent_sock->Failed() ||
if (!(*inout)->_agent_socket || (agent_sock->parsing_context() &&
(*inout)->_agent_socket->Failed()) { static_cast<H2Context*>(agent_sock->parsing_context())->RunOutStreams())) {
break; // Create a new agent socket
}
H2Context* ctx = static_cast<H2Context*>((*inout)->_agent_socket->parsing_context());
// According to https://httpwg.org/specs/rfc7540.html#StreamIdentifiers:
// A client that is unable to establish a new stream identifier can establish
// a new connection for new streams.
if (ctx && ctx->RunOutStreams()) {
break;
}
(*inout)->_agent_socket->ReAddress(inout);
need_create_agent = false;
} while (0);
if (need_create_agent) {
SocketId sid; SocketId sid;
SocketOptions opt = (*inout)->_options; SocketOptions opt = (*inout)->_options;
opt.health_check_interval_s = -1; opt.health_check_interval_s = -1;
...@@ -1690,6 +1680,8 @@ void* H2GlobalStreamCreator::OnCreatingStream( ...@@ -1690,6 +1680,8 @@ void* H2GlobalStreamCreator::OnCreatingStream(
if (tmp_ptr) { if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference(); tmp_ptr->ReleaseAdditionalReference();
} }
} else {
agent_sock->ReAddress(inout);
} }
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl); H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl);
...@@ -1697,19 +1689,12 @@ void* H2GlobalStreamCreator::OnCreatingStream( ...@@ -1697,19 +1689,12 @@ void* H2GlobalStreamCreator::OnCreatingStream(
cntl->SetFailed(ENOMEM, "Fail to create H2UnsentRequest"); cntl->SetFailed(ENOMEM, "Fail to create H2UnsentRequest");
return NULL; return NULL;
} }
return (void*)h2_req; return h2_req;
} }
void H2GlobalStreamCreator::OnDestroyingStream(SocketUniquePtr& sending_sock, void H2GlobalStreamCreator::DestroyStreamCreator(Controller* cntl) {
Controller* cntl, // H2GlobalStreamCreator is a global singleton value.
int error_code, // Don't delete it in this function.
bool end_of_rpc,
void* stream_user_data) {
// [stream_user_data == NULL] means that RPC has failed before it is created.
H2UnsentRequest* h2_req = static_cast<H2UnsentRequest*>(stream_user_data);
if (h2_req) {
h2_req->Discard(sending_sock, cntl);
}
} }
StreamCreator* get_h2_global_stream_creator() { StreamCreator* get_h2_global_stream_creator() {
......
...@@ -88,7 +88,7 @@ inline Http2Bvars* get_http2_bvars() { ...@@ -88,7 +88,7 @@ inline Http2Bvars* get_http2_bvars() {
return butil::get_leaky_singleton<Http2Bvars>(); return butil::get_leaky_singleton<Http2Bvars>();
} }
class H2UnsentRequest : public SocketMessage { class H2UnsentRequest : public SocketMessage, public StreamUserData {
friend void PackH2Request(butil::IOBuf*, SocketMessage**, friend void PackH2Request(butil::IOBuf*, SocketMessage**,
uint64_t, const google::protobuf::MethodDescriptor*, uint64_t, const google::protobuf::MethodDescriptor*,
Controller*, const butil::IOBuf&, const Authenticator*); Controller*, const butil::IOBuf&, const Authenticator*);
...@@ -107,10 +107,14 @@ public: ...@@ -107,10 +107,14 @@ public:
} }
// @SocketMessage // @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*); butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*) override;
size_t EstimatedByteSize(); size_t EstimatedByteSize() override;
void Discard(SocketUniquePtr& sending_sock, Controller* cntl); // @StreamUserData
void DestroyStreamUserData(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc) override;
private: private:
std::string& push(const std::string& name) std::string& push(const std::string& name)
...@@ -235,12 +239,8 @@ void PackH2Request(butil::IOBuf* buf, ...@@ -235,12 +239,8 @@ void PackH2Request(butil::IOBuf* buf,
class H2GlobalStreamCreator : public StreamCreator { class H2GlobalStreamCreator : public StreamCreator {
protected: protected:
void* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override; StreamUserData* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock, void DestroyStreamCreator(Controller* cntl) override;
Controller* cntl,
int error_code,
bool end_of_rpc,
void* stream_user_data) override;
private: private:
butil::Mutex _mutex; butil::Mutex _mutex;
}; };
......
...@@ -1678,7 +1678,7 @@ void RtmpClientStream::SignalError() { ...@@ -1678,7 +1678,7 @@ void RtmpClientStream::SignalError() {
} }
} }
void* RtmpClientStream::OnCreatingStream( StreamUserData* RtmpClientStream::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) { SocketUniquePtr* inout, Controller* cntl) {
{ {
std::unique_lock<butil::Mutex> mu(_state_mutex); std::unique_lock<butil::Mutex> mu(_state_mutex);
...@@ -1711,7 +1711,7 @@ void* RtmpClientStream::OnCreatingStream( ...@@ -1711,7 +1711,7 @@ void* RtmpClientStream::OnCreatingStream(
<< ", main socketId=" << (*inout)->id(); << ", main socketId=" << (*inout)->id();
tmp_ptr->ShareStats(inout->get()); tmp_ptr->ShareStats(inout->get());
inout->reset(tmp_ptr.release()); inout->reset(tmp_ptr.release());
return NULL; return this;
} }
int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) { int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
...@@ -1746,11 +1746,10 @@ void RtmpClientStream::OnFailedToCreateStream() { ...@@ -1746,11 +1746,10 @@ void RtmpClientStream::OnFailedToCreateStream() {
return OnStopInternal(); return OnStopInternal();
} }
void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock, void RtmpClientStream::DestroyStreamUserData(SocketUniquePtr& sending_sock,
Controller* cntl, Controller* cntl,
int /*error_code*/, int error_code,
bool end_of_rpc, bool end_of_rpc) {
void* /*stream_data*/) {
if (!end_of_rpc) { if (!end_of_rpc) {
if (sending_sock) { if (sending_sock) {
if (_from_socketmap) { if (_from_socketmap) {
...@@ -1760,15 +1759,18 @@ void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock, ...@@ -1760,15 +1759,18 @@ void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock,
sending_sock->SetFailed(); // not necessary, already failed. sending_sock->SetFailed(); // not necessary, already failed.
} }
} }
return; } else {
}
// Always move sending_sock into _rtmpsock. // Always move sending_sock into _rtmpsock.
// - If the RPC is successful, moving sending_sock prevents it from // - If the RPC is successful, moving sending_sock prevents it from
// setfailed in Controller after calling this method. // setfailed in Controller after calling this method.
// - If the RPC is failed, OnStopInternal() can clean up the socket_map // - If the RPC is failed, OnStopInternal() can clean up the socket_map
// inserted in OnCreatingStream(). // inserted in OnCreatingStream().
_rtmpsock.swap(sending_sock); _rtmpsock.swap(sending_sock);
}
}
void RtmpClientStream::DestroyStreamCreator(Controller* cntl) {
if (cntl->Failed()) { if (cntl->Failed()) {
if (_rtmpsock != NULL && if (_rtmpsock != NULL &&
// ^ If sending_sock is NULL, the RPC fails before _pack_request // ^ If sending_sock is NULL, the RPC fails before _pack_request
......
...@@ -780,7 +780,8 @@ struct RtmpClientStreamOptions { ...@@ -780,7 +780,8 @@ struct RtmpClientStreamOptions {
// Represent a "NetStream" in AS. Multiple streams can be multiplexed // Represent a "NetStream" in AS. Multiple streams can be multiplexed
// into one TCP connection. // into one TCP connection.
class RtmpClientStream : public RtmpStreamBase class RtmpClientStream : public RtmpStreamBase
, public StreamCreator { , public StreamCreator
, public StreamUserData {
public: public:
RtmpClientStream(); RtmpClientStream();
...@@ -821,9 +822,14 @@ friend class RtmpRetryingClientStream; ...@@ -821,9 +822,14 @@ friend class RtmpRetryingClientStream;
int Publish(const butil::StringPiece& name, RtmpPublishType type); int Publish(const butil::StringPiece& name, RtmpPublishType type);
// @StreamCreator // @StreamCreator
void* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override; StreamUserData* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr&, Controller*, int error_code, void DestroyStreamCreator(Controller* cntl) override;
bool end_of_rpc, void*) override;
// @StreamUserData
void DestroyStreamUserData(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc) override;
void OnFailedToCreateStream(); void OnFailedToCreateStream();
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
namespace brpc { namespace brpc {
class Controller; class Controller;
class StreamUserData;
// Abstract creation of "user-level connection" over a RPC-like process. // Abstract creation of "user-level connection" over a RPC-like process.
// Lifetime of this object should be guaranteed by user during the RPC, // Lifetime of this object should be guaranteed by user during the RPC,
...@@ -40,23 +41,34 @@ public: ...@@ -40,23 +41,34 @@ public:
// when stream_creator is present. // when stream_creator is present.
// cntl: contains contexts of the RPC, if there's any error during // cntl: contains contexts of the RPC, if there's any error during
// replacement, call cntl->SetFailed(). // replacement, call cntl->SetFailed().
virtual void* OnCreatingStream(SocketUniquePtr* inout, virtual StreamUserData* OnCreatingStream(SocketUniquePtr* inout,
Controller* cntl) = 0; Controller* cntl) = 0;
// Called when the stream is about to destroyed. // Called when the StreamCreator is about to destroyed.
// If the RPC has retries, this function MUST be called before each retry. // This function MUST be called only once at the end of successful RPC
// Call to recycle resources.
// Params: // Params:
// sending_sock: The socket chosen by OnCreatingStream(), if OnCreatingStream // cntl: contexts of the RPC
// is not called, the enclosed socket may be NULL. virtual void DestroyStreamCreator(Controller* cntl) = 0;
};
// The Intermediate user data created by StreamCreator to record the context
// of a specific stream request.
class StreamUserData {
public:
// Called when the streamUserData is about to destroyed.
// This function MUST be called to clean up resources if OnCreatingStream
// of StreamCreator has returned a valid StreamUserData pointer.
// Params:
// sending_sock: The socket chosen by OnCreatingStream(), if an error
// happens during choosing, the enclosed socket is NULL.
// cntl: contexts of the RPC // cntl: contexts of the RPC
// error_code: Use this instead of cntl->ErrorCode() // error_code: Use this instead of cntl->ErrorCode()
// end_of_rpc: true if the RPC is about to destroyed. // end_of_rpc: true if the RPC is about to destroyed.
// stream_user_data: the corresponding user data of this very stream virtual void DestroyStreamUserData(SocketUniquePtr& sending_sock,
virtual void OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl, Controller* cntl,
int error_code, int error_code,
bool end_of_rpc, bool end_of_rpc) = 0;
void* stream_user_data) = 0;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -837,7 +837,7 @@ TEST(RtmpTest, retrying_stream) { ...@@ -837,7 +837,7 @@ TEST(RtmpTest, retrying_stream) {
LOG(INFO) << "Stopping server"; LOG(INFO) << "Stopping server";
server.Stop(0); server.Stop(0);
server.Join(); server.Join();
LOG(INFO) << "Stopped server and sleep for awhile"; LOG(INFO) << "Stopped server and sleep for a while";
sleep(3); sleep(3);
ASSERT_EQ(0, server.Start(8576, &server_opt)); ASSERT_EQ(0, server.Start(8576, &server_opt));
sleep(3); sleep(3);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment