Commit f94fe364 authored by zhujiashun's avatar zhujiashun Committed by gejun

Make OnCreatingStream return stream_user_data making code more readable

parent 9d445c3c
......@@ -733,8 +733,10 @@ void Controller::Call::OnComplete(
case CONNECTION_TYPE_SHORT:
if (sending_sock != NULL) {
// Check the comment in CONNECTION_TYPE_POOLED branch.
if (!sending_sock->is_read_progressive() && c->_stream_creator == NULL) {
sending_sock->SetFailed();
if (!sending_sock->is_read_progressive()) {
if (c->_stream_creator == NULL) {
sending_sock->SetFailed();
}
} else {
sending_sock->OnProgressiveReadCompleted();
}
......@@ -998,7 +1000,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_remote_side = tmp_sock->remote_side();
}
if (_stream_creator) {
_stream_creator->OnCreatingStream(&tmp_sock, this);
_current_call.stream_user_data =
_stream_creator->OnCreatingStream(&tmp_sock, this);
if (FailedInline()) {
return HandleSendFailed();
}
......
......@@ -581,7 +581,7 @@ private:
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
// socket fetched from socket pool
SocketUniquePtr sending_sock;
StreamUserData* stream_user_data;
void* stream_user_data;
};
void HandleStreamConnection(Socket *host_socket);
......
......@@ -67,11 +67,10 @@ public:
_cntl->_current_call.sending_sock.reset(ptr.release());
}
ControllerPrivateAccessor &set_stream_user_data(StreamUserData* ptr){
_cntl->_current_call.stream_user_data = ptr;
return *this;
void* get_stream_user_data() {
return _cntl->_current_call.stream_user_data;
}
ControllerPrivateAccessor &set_security_mode(bool security_mode) {
_cntl->set_flag(Controller::FLAGS_SECURITY_MODE, security_mode);
return *this;
......
......@@ -1232,7 +1232,7 @@ static void PackH2Message(butil::IOBuf* out,
}
}
H2UnsentRequest* H2UnsentRequest::New(Controller* c, uint64_t correlation_id) {
H2UnsentRequest* H2UnsentRequest::New(Controller* c) {
const HttpHeader& h = c->http_request();
const CommonStrings* const common = get_common_strings();
const bool need_content_length = (h.method() != HTTP_METHOD_GET);
......@@ -1311,7 +1311,6 @@ H2UnsentRequest* H2UnsentRequest::New(Controller* c, uint64_t correlation_id) {
val->append(encoded_user_info);
}
msg->_sctx.reset(new H2StreamContext);
msg->_sctx->set_correlation_id(correlation_id);
return msg;
}
......@@ -1331,7 +1330,7 @@ private:
H2UnsentRequest* _msg;
};
void H2UnsentRequest::OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl) {
void H2UnsentRequest::Discard(SocketUniquePtr& sending_sock, Controller* cntl) {
RemoveRefOnQuit deref_self(this);
if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(cntl, _cntl);
......@@ -1342,6 +1341,7 @@ void H2UnsentRequest::OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl)
ctx->AddAbandonedStream(_stream_id);
}
}
RemoveRefManually();
}
// bvar::Adder<int64_t> g_append_request_time;
......@@ -1635,13 +1635,9 @@ void PackH2Request(butil::IOBuf*,
header->SetHeader("Authorization", auth_data);
}
// Serialize http2 request
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl, correlation_id);
if (!h2_req) {
return cntl->SetFailed(ENOMEM, "Fail to create H2UnsentRequest");
}
h2_req->AddRefManually(); // add ref for H2UnsentRequest::OnDestroy
accessor.set_stream_user_data(h2_req);
H2UnsentRequest* h2_req = (H2UnsentRequest*)accessor.get_stream_user_data();
h2_req->AddRefManually(); // add ref for AppendAndDestroySelf
h2_req->_sctx->set_correlation_id(correlation_id);
*user_message = h2_req;
if (FLAGS_http_verbose) {
......@@ -1651,11 +1647,12 @@ void PackH2Request(butil::IOBuf*,
}
}
void H2GlobalStreamCreator::OnCreatingStream(
void* H2GlobalStreamCreator::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) {
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
std::unique_lock<butil::Mutex> mu(_mutex);
bool need_create_agent = true;
do {
if (!(*inout)->_agent_socket ||
(*inout)->_agent_socket->Failed()) {
......@@ -1669,39 +1666,49 @@ void H2GlobalStreamCreator::OnCreatingStream(
break;
}
(*inout)->_agent_socket->ReAddress(inout);
return;
need_create_agent = false;
} while (0);
SocketId sid;
SocketOptions opt = (*inout)->_options;
opt.health_check_interval_s = -1;
// TODO(zhujiashun): Predictively create socket to improve performance
if (get_client_side_messenger()->Create(opt, &sid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create H2 socket");
return;
}
SocketUniquePtr tmp_ptr;
if (Socket::Address(sid, &tmp_ptr) != 0) {
cntl->SetFailed(EFAILEDSOCKET, "Fail to address H2 socketId=%" PRIu64, sid);
return;
if (need_create_agent) {
SocketId sid;
SocketOptions opt = (*inout)->_options;
opt.health_check_interval_s = -1;
// TODO(zhujiashun): Predictively create socket to improve performance
if (get_client_side_messenger()->Create(opt, &sid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create H2 socket");
return NULL;
}
SocketUniquePtr tmp_ptr;
if (Socket::Address(sid, &tmp_ptr) != 0) {
cntl->SetFailed(EFAILEDSOCKET, "Fail to address H2 socketId=%" PRIu64, sid);
return NULL;
}
tmp_ptr->ShareStats(inout->get());
(*inout)->_agent_socket.swap(tmp_ptr);
mu.unlock();
(*inout)->_agent_socket->ReAddress(inout);
if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference();
}
}
tmp_ptr->ShareStats(inout->get());
(*inout)->_agent_socket.swap(tmp_ptr);
mu.unlock();
(*inout)->_agent_socket->ReAddress(inout);
if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference();
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl);
if (!h2_req) {
cntl->SetFailed(ENOMEM, "Fail to create H2UnsentRequest");
return NULL;
}
return (void*)h2_req;
}
void H2GlobalStreamCreator::OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc,
StreamUserData* stream_user_data) {
void* stream_user_data) {
// [stream_user_data == NULL] means that RPC has failed before it is created.
if (stream_user_data) {
stream_user_data->OnDestroy(sending_sock, cntl);
H2UnsentRequest* h2_req = static_cast<H2UnsentRequest*>(stream_user_data);
if (h2_req) {
h2_req->Discard(sending_sock, cntl);
}
}
......
......@@ -88,9 +88,12 @@ inline Http2Bvars* get_http2_bvars() {
return butil::get_leaky_singleton<Http2Bvars>();
}
class H2UnsentRequest : public SocketMessage, public StreamUserData {
class H2UnsentRequest : public SocketMessage {
friend void PackH2Request(butil::IOBuf*, SocketMessage**,
uint64_t, const google::protobuf::MethodDescriptor*,
Controller*, const butil::IOBuf&, const Authenticator*);
public:
static H2UnsentRequest* New(Controller* c, uint64_t correlation_id);
static H2UnsentRequest* New(Controller* c);
void Describe(butil::IOBuf*) const;
int AddRefManually()
......@@ -107,9 +110,8 @@ public:
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
size_t EstimatedByteSize();
// @StreamUserData
void OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl);
void Discard(SocketUniquePtr& sending_sock, Controller* cntl);
private:
std::string& push(const std::string& name)
{ return (new (&_list[_size++]) HPacker::Header(name))->value; }
......@@ -233,12 +235,12 @@ void PackH2Request(butil::IOBuf* buf,
class H2GlobalStreamCreator : public StreamCreator {
protected:
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc,
StreamUserData* stream_user_data) override;
void* stream_user_data) override;
private:
butil::Mutex _mutex;
};
......
......@@ -1678,26 +1678,26 @@ void RtmpClientStream::SignalError() {
}
}
void RtmpClientStream::OnCreatingStream(
void* RtmpClientStream::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) {
{
std::unique_lock<butil::Mutex> mu(_state_mutex);
if (_state == STATE_ERROR || _state == STATE_DESTROYING) {
cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying");
return;
return NULL;
}
}
SocketId esid;
if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create RTMP socket");
return;
return NULL;
}
} else {
if (_client_impl->socket_map().Insert(
SocketMapKey((*inout)->remote_side()), &esid) != 0) {
cntl->SetFailed(EINVAL, "Fail to get the RTMP socket");
return;
return NULL;
}
}
SocketUniquePtr tmp_ptr;
......@@ -1705,12 +1705,13 @@ void RtmpClientStream::OnCreatingStream(
cntl->SetFailed(EFAILEDSOCKET, "Fail to address RTMP SocketId=%" PRIu64
" from SocketMap of RtmpClient=%p",
esid, _client_impl.get());
return;
return NULL;
}
RPC_VLOG << "Replace Socket For Stream, RTMP socketId=" << esid
<< ", main socketId=" << (*inout)->id();
tmp_ptr->ShareStats(inout->get());
inout->reset(tmp_ptr.release());
return NULL;
}
int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
......@@ -1749,7 +1750,7 @@ void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int /*error_code*/,
bool end_of_rpc,
StreamUserData* /*stream_data*/) {
void* /*stream_data*/) {
if (!end_of_rpc) {
if (sending_sock) {
if (_from_socketmap) {
......
......@@ -821,9 +821,9 @@ friend class RtmpRetryingClientStream;
int Publish(const butil::StringPiece& name, RtmpPublishType type);
// @StreamCreator
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr&, Controller*, int error_code,
bool end_of_rpc, StreamUserData*) override;
bool end_of_rpc, void*) override;
void OnFailedToCreateStream();
......
......@@ -22,14 +22,6 @@
namespace brpc {
class Controller;
// The stream user data on a specific Call
class StreamUserData {
public:
virtual ~StreamUserData() {}
virtual void OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl) = 0;
};
// Abstract creation of "user-level connection" over a RPC-like process.
// Lifetime of this object should be guaranteed by user during the RPC,
// generally this object is created before RPC and destroyed after RPC.
......@@ -48,8 +40,8 @@ public:
// when stream_creator is present.
// cntl: contains contexts of the RPC, if there's any error during
// replacement, call cntl->SetFailed().
virtual void OnCreatingStream(SocketUniquePtr* inout,
Controller* cntl) = 0;
virtual void* OnCreatingStream(SocketUniquePtr* inout,
Controller* cntl) = 0;
// Called when the stream is about to destroyed.
// If the RPC has retries, this function MUST be called before each retry.
......@@ -64,7 +56,7 @@ public:
Controller* cntl,
int error_code,
bool end_of_rpc,
StreamUserData* stream_user_data) = 0;
void* stream_user_data) = 0;
};
} // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment