Commit d9fe819d authored by zhujiashun's avatar zhujiashun Committed by gejun

* Abstract stream-level data as StreamUserData

* Apply StreamUserData to h2 and rtmp
parent 48127543
...@@ -261,13 +261,13 @@ Controller::Call::Call(Controller::Call* rhs) ...@@ -261,13 +261,13 @@ Controller::Call::Call(Controller::Call* rhs)
, peer_id(rhs->peer_id) , peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us) , begin_time_us(rhs->begin_time_us)
, sending_sock(rhs->sending_sock.release()) , sending_sock(rhs->sending_sock.release())
, stream_creator(rhs->stream_creator) { , stream_user_data(rhs->stream_user_data) {
// NOTE: fields in rhs should be reset because RPC could fail before // NOTE: fields in rhs should be reset because RPC could fail before
// setting all the fields to next call and _current_call.OnComplete // setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly. // will behave incorrectly.
rhs->need_feedback = false; rhs->need_feedback = false;
rhs->peer_id = (SocketId)-1; rhs->peer_id = (SocketId)-1;
rhs->stream_creator = NULL; rhs->stream_user_data = NULL;
} }
Controller::Call::~Call() { Controller::Call::~Call() {
...@@ -282,7 +282,7 @@ void Controller::Call::Reset() { ...@@ -282,7 +282,7 @@ void Controller::Call::Reset() {
peer_id = (SocketId)-1; peer_id = (SocketId)-1;
begin_time_us = 0; begin_time_us = 0;
sending_sock.reset(NULL); sending_sock.reset(NULL);
stream_creator = NULL; stream_user_data = NULL;
} }
void Controller::set_timeout_ms(int64_t timeout_ms) { void Controller::set_timeout_ms(int64_t timeout_ms) {
...@@ -769,13 +769,13 @@ void Controller::Call::OnComplete( ...@@ -769,13 +769,13 @@ void Controller::Call::OnComplete(
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (stream_creator) { if (c->stream_creator()) {
stream_creator->OnDestroyingStream( c->stream_creator()->OnDestroyingStream(
sending_sock, c, error_code, end_of_rpc); sending_sock, c, error_code, end_of_rpc, stream_user_data);
} }
// Release the `Socket' we used to send/receive data // Release the `Socket' we used to send/receive data
sending_sock.reset(NULL); sending_sock.reset(NULL);
stream_user_data = NULL;
} }
void Controller::EndRPC(const CompletionInfo& info) { void Controller::EndRPC(const CompletionInfo& info) {
......
...@@ -248,16 +248,11 @@ public: ...@@ -248,16 +248,11 @@ public:
void reset_rpc_dump_meta(RpcDumpMeta* meta); void reset_rpc_dump_meta(RpcDumpMeta* meta);
const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; } const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; }
// Attach a global StreamCreator to this RPC. Notice that controller never // Attach a StreamCreator to this RPC. Notice that controller never deletes
// deletes the StreamCreator, you can do the deletion inside OnDestroyingStream. // the StreamCreator, you can do the deletion inside OnDestroyingStream.
void set_stream_creator(StreamCreator* sc) { _stream_creator = sc; } void set_stream_creator(StreamCreator* sc) { _stream_creator = sc; }
StreamCreator* stream_creator() const { return _stream_creator; } StreamCreator* stream_creator() const { return _stream_creator; }
// Attach a StreamCreator to current call. In some protocols(such as h2), each
// call has a specific StreamCreator, use this function to set.
void set_current_stream_creator(StreamCreator* sc) { _current_call.stream_creator = sc; }
StreamCreator* current_stream_creator() const { return _current_call.stream_creator; }
// Make the RPC end when the HTTP response has complete headers and let // Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy(). // user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
...@@ -586,7 +581,7 @@ private: ...@@ -586,7 +581,7 @@ private:
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
// socket fetched from socket pool // socket fetched from socket pool
SocketUniquePtr sending_sock; SocketUniquePtr sending_sock;
StreamCreator* stream_creator; StreamUserData* stream_user_data;
}; };
void HandleStreamConnection(Socket *host_socket); void HandleStreamConnection(Socket *host_socket);
......
...@@ -67,6 +67,11 @@ public: ...@@ -67,6 +67,11 @@ public:
_cntl->_current_call.sending_sock.reset(ptr.release()); _cntl->_current_call.sending_sock.reset(ptr.release());
} }
ControllerPrivateAccessor &set_stream_user_data(StreamUserData* ptr){
_cntl->_current_call.stream_user_data = ptr;
return *this;
}
ControllerPrivateAccessor &set_security_mode(bool security_mode) { ControllerPrivateAccessor &set_security_mode(bool security_mode) {
_cntl->set_flag(Controller::FLAGS_SECURITY_MODE, security_mode); _cntl->set_flag(Controller::FLAGS_SECURITY_MODE, security_mode);
return *this; return *this;
......
...@@ -1323,39 +1323,27 @@ void H2UnsentRequest::Destroy() { ...@@ -1323,39 +1323,27 @@ void H2UnsentRequest::Destroy() {
free(this); free(this);
} }
void H2UnsentRequest::OnCreatingStream(SocketUniquePtr*, Controller*) { struct RemoveRefOnQuit {
CHECK(false) << "H2UnsentRequest::OnCreatingStream should not be called"; RemoveRefOnQuit(H2UnsentRequest* msg) : _msg(msg) {}
} ~RemoveRefOnQuit() { _msg->RemoveRefManually(); }
private:
DISALLOW_COPY_AND_ASSIGN(RemoveRefOnQuit);
H2UnsentRequest* _msg;
};
void H2UnsentRequest::OnDestroyingStream( void H2UnsentRequest::OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl) {
SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) { RemoveRefOnQuit deref_self(this);
if (!end_of_rpc) {
return;
}
// If cntl->ErrorCode == 0, then it is a normal response and stream has
// already been removed in EndRemoteStream.
if (sending_sock != NULL && cntl->ErrorCode() != 0) { if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(_cntl, cntl); CHECK_EQ(cntl, _cntl);
_mutex.lock(); std::unique_lock<butil::Mutex> mu(_mutex);
_cntl = NULL; _cntl = NULL;
if (_stream_id != 0) { if (_stream_id != 0) {
H2Context* ctx = H2Context* ctx = static_cast<H2Context*>(sending_sock->parsing_context());
static_cast<H2Context*>(sending_sock->parsing_context());
ctx->AddAbandonedStream(_stream_id); ctx->AddAbandonedStream(_stream_id);
} }
_mutex.unlock();
} }
RemoveRefManually();
} }
struct RemoveRefOnQuit {
RemoveRefOnQuit(H2UnsentRequest* msg) : _msg(msg) {}
~RemoveRefOnQuit() { _msg->RemoveRefManually(); }
private:
DISALLOW_COPY_AND_ASSIGN(RemoveRefOnQuit);
H2UnsentRequest* _msg;
};
// bvar::Adder<int64_t> g_append_request_time; // bvar::Adder<int64_t> g_append_request_time;
// bvar::PerSecond<bvar::Adder<int64_t> > g_append_request_time_per_second( // bvar::PerSecond<bvar::Adder<int64_t> > g_append_request_time_per_second(
// "h2_append_request_second", &g_append_request_time); // "h2_append_request_second", &g_append_request_time);
...@@ -1649,11 +1637,11 @@ void PackH2Request(butil::IOBuf*, ...@@ -1649,11 +1637,11 @@ void PackH2Request(butil::IOBuf*,
// Serialize http2 request // Serialize http2 request
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl, correlation_id); H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl, correlation_id);
h2_req->AddRefManually(); // add for OnDestroyingStream if (!h2_req) {
if (cntl->current_stream_creator()) { return cntl->SetFailed(ENOMEM, "Fail to create H2UnsentRequest");
dynamic_cast<H2UnsentRequest*>(cntl->current_stream_creator())->RemoveRefManually();
} }
cntl->set_current_stream_creator(h2_req); h2_req->AddRefManually(); // add ref for H2UnsentRequest::OnDestroy
accessor.set_stream_user_data(h2_req);
*user_message = h2_req; *user_message = h2_req;
if (FLAGS_http_verbose) { if (FLAGS_http_verbose) {
...@@ -1706,10 +1694,15 @@ void H2GlobalStreamCreator::OnCreatingStream( ...@@ -1706,10 +1694,15 @@ void H2GlobalStreamCreator::OnCreatingStream(
} }
} }
void H2GlobalStreamCreator::OnDestroyingStream( void H2GlobalStreamCreator::OnDestroyingStream(SocketUniquePtr& sending_sock,
SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) { Controller* cntl,
// If any error happens during the time of sending rpc, this function int error_code,
// would be called. Currently just do nothing. bool end_of_rpc,
StreamUserData* stream_user_data) {
// [stream_user_data == NULL] means that RPC has failed before it is created.
if (stream_user_data) {
stream_user_data->OnDestroy(sending_sock, cntl);
}
} }
StreamCreator* get_h2_global_stream_creator() { StreamCreator* get_h2_global_stream_creator() {
......
...@@ -88,7 +88,7 @@ inline Http2Bvars* get_http2_bvars() { ...@@ -88,7 +88,7 @@ inline Http2Bvars* get_http2_bvars() {
return butil::get_leaky_singleton<Http2Bvars>(); return butil::get_leaky_singleton<Http2Bvars>();
} }
class H2UnsentRequest : public SocketMessage, public StreamCreator { class H2UnsentRequest : public SocketMessage, public StreamUserData {
public: public:
static H2UnsentRequest* New(Controller* c, uint64_t correlation_id); static H2UnsentRequest* New(Controller* c, uint64_t correlation_id);
void Describe(butil::IOBuf*) const; void Describe(butil::IOBuf*) const;
...@@ -103,14 +103,13 @@ public: ...@@ -103,14 +103,13 @@ public:
} }
} }
// @StreamCreator
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) override;
// @SocketMessage // @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*); butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
size_t EstimatedByteSize(); size_t EstimatedByteSize();
// @StreamUserData
void OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl);
private: private:
std::string& push(const std::string& name) std::string& push(const std::string& name)
{ return (new (&_list[_size++]) HPacker::Header(name))->value; } { return (new (&_list[_size++]) HPacker::Header(name))->value; }
...@@ -235,7 +234,11 @@ void PackH2Request(butil::IOBuf* buf, ...@@ -235,7 +234,11 @@ void PackH2Request(butil::IOBuf* buf,
class H2GlobalStreamCreator : public StreamCreator { class H2GlobalStreamCreator : public StreamCreator {
protected: protected:
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override; void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr& sending_sock, Controller* cntl, int error_code, bool end_of_rpc) override; void OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc,
StreamUserData* stream_user_data) override;
private: private:
butil::Mutex _mutex; butil::Mutex _mutex;
}; };
......
...@@ -1748,7 +1748,8 @@ void RtmpClientStream::OnFailedToCreateStream() { ...@@ -1748,7 +1748,8 @@ void RtmpClientStream::OnFailedToCreateStream() {
void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock, void RtmpClientStream::OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl, Controller* cntl,
int /*error_code*/, int /*error_code*/,
bool end_of_rpc) { bool end_of_rpc,
StreamUserData* /*stream_data*/) {
if (!end_of_rpc) { if (!end_of_rpc) {
if (sending_sock) { if (sending_sock) {
if (_from_socketmap) { if (_from_socketmap) {
...@@ -2137,7 +2138,6 @@ void RtmpClientStream::Init(const RtmpClient* client, ...@@ -2137,7 +2138,6 @@ void RtmpClientStream::Init(const RtmpClient* client,
// In RTMP, stream_creator and current stream_creator is always // In RTMP, stream_creator and current stream_creator is always
// this RtmpClientStream. // this RtmpClientStream.
done->cntl.set_stream_creator(this); done->cntl.set_stream_creator(this);
done->cntl.set_current_stream_creator(this);
done->cntl.set_connection_type(_options.share_connection ? done->cntl.set_connection_type(_options.share_connection ?
CONNECTION_TYPE_SINGLE : CONNECTION_TYPE_SINGLE :
CONNECTION_TYPE_SHORT); CONNECTION_TYPE_SHORT);
......
...@@ -822,7 +822,8 @@ friend class RtmpRetryingClientStream; ...@@ -822,7 +822,8 @@ friend class RtmpRetryingClientStream;
// @StreamCreator // @StreamCreator
void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override; void OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void OnDestroyingStream(SocketUniquePtr&, Controller*, int error_code, bool end_of_rpc) override; void OnDestroyingStream(SocketUniquePtr&, Controller*, int error_code,
bool end_of_rpc, StreamUserData*) override;
void OnFailedToCreateStream(); void OnFailedToCreateStream();
......
...@@ -19,10 +19,17 @@ ...@@ -19,10 +19,17 @@
#include "brpc/socket_id.h" #include "brpc/socket_id.h"
namespace brpc { namespace brpc {
class Controller; class Controller;
// The stream user data on a specific Call
class StreamUserData {
public:
virtual ~StreamUserData() {}
virtual void OnDestroy(SocketUniquePtr& sending_sock, Controller* cntl) = 0;
};
// Abstract creation of "user-level connection" over a RPC-like process. // Abstract creation of "user-level connection" over a RPC-like process.
// Lifetime of this object should be guaranteed by user during the RPC, // Lifetime of this object should be guaranteed by user during the RPC,
// generally this object is created before RPC and destroyed after RPC. // generally this object is created before RPC and destroyed after RPC.
...@@ -52,10 +59,12 @@ public: ...@@ -52,10 +59,12 @@ public:
// cntl: contexts of the RPC // cntl: contexts of the RPC
// error_code: Use this instead of cntl->ErrorCode() // error_code: Use this instead of cntl->ErrorCode()
// end_of_rpc: true if the RPC is about to destroyed. // end_of_rpc: true if the RPC is about to destroyed.
// stream_user_data: the corresponding user data of this very stream
virtual void OnDestroyingStream(SocketUniquePtr& sending_sock, virtual void OnDestroyingStream(SocketUniquePtr& sending_sock,
Controller* cntl, Controller* cntl,
int error_code, int error_code,
bool end_of_rpc) = 0; bool end_of_rpc,
StreamUserData* stream_user_data) = 0;
}; };
} // namespace brpc } // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment