Commit 48127543 authored by zhujiashun's avatar zhujiashun Committed by gejun

Make StreamCreator be member of Call to make retry and backup request work

parent 62b61711
...@@ -512,7 +512,7 @@ Controller.retried_count() returns number of retries. ...@@ -512,7 +512,7 @@ Controller.retried_count() returns number of retries.
Controller.has_backup_request() tells if backup_request was sent. Controller.has_backup_request() tells if backup_request was sent.
**servers tried before are not retried by best efforts** **Servers tried before are not retried by best efforts**
Conditions for retrying (AND relations): Conditions for retrying (AND relations):
- Broken connection. - Broken connection.
......
...@@ -260,12 +260,14 @@ Controller::Call::Call(Controller::Call* rhs) ...@@ -260,12 +260,14 @@ Controller::Call::Call(Controller::Call* rhs)
, need_feedback(rhs->need_feedback) , need_feedback(rhs->need_feedback)
, peer_id(rhs->peer_id) , peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us) , begin_time_us(rhs->begin_time_us)
, sending_sock(rhs->sending_sock.release()) { , sending_sock(rhs->sending_sock.release())
, stream_creator(rhs->stream_creator) {
// NOTE: fields in rhs should be reset because RPC could fail before // NOTE: fields in rhs should be reset because RPC could fail before
// setting all the fields to next call and _current_call.OnComplete // setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly. // will behave incorrectly.
rhs->need_feedback = false; rhs->need_feedback = false;
rhs->peer_id = (SocketId)-1; rhs->peer_id = (SocketId)-1;
rhs->stream_creator = NULL;
} }
Controller::Call::~Call() { Controller::Call::~Call() {
...@@ -280,6 +282,7 @@ void Controller::Call::Reset() { ...@@ -280,6 +282,7 @@ void Controller::Call::Reset() {
peer_id = (SocketId)-1; peer_id = (SocketId)-1;
begin_time_us = 0; begin_time_us = 0;
sending_sock.reset(NULL); sending_sock.reset(NULL);
stream_creator = NULL;
} }
void Controller::set_timeout_ms(int64_t timeout_ms) { void Controller::set_timeout_ms(int64_t timeout_ms) {
...@@ -619,7 +622,6 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, ...@@ -619,7 +622,6 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
_http_response->Clear(); _http_response->Clear();
} }
response_attachment().clear(); response_attachment().clear();
return IssueRPC(butil::gettimeofday_us()); return IssueRPC(butil::gettimeofday_us());
} }
...@@ -767,8 +769,8 @@ void Controller::Call::OnComplete( ...@@ -767,8 +769,8 @@ void Controller::Call::OnComplete(
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
if (c->stream_creator()) { if (stream_creator) {
c->stream_creator()->OnDestroyingStream( stream_creator->OnDestroyingStream(
sending_sock, c, error_code, end_of_rpc); sending_sock, c, error_code, end_of_rpc);
} }
......
...@@ -248,11 +248,16 @@ public: ...@@ -248,11 +248,16 @@ public:
void reset_rpc_dump_meta(RpcDumpMeta* meta); void reset_rpc_dump_meta(RpcDumpMeta* meta);
const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; } const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; }
// Attach a StreamCreator to this RPC. Notice that controller never deletes // Attach a global StreamCreator to this RPC. Notice that controller never
// the StreamCreator, you can do the deletion inside OnDestroyingStream. // deletes the StreamCreator, you can do the deletion inside OnDestroyingStream.
void set_stream_creator(StreamCreator* sc) { _stream_creator = sc; } void set_stream_creator(StreamCreator* sc) { _stream_creator = sc; }
StreamCreator* stream_creator() const { return _stream_creator; } StreamCreator* stream_creator() const { return _stream_creator; }
// Attach a StreamCreator to current call. In some protocols(such as h2), each
// call has a specific StreamCreator, use this function to set.
void set_current_stream_creator(StreamCreator* sc) { _current_call.stream_creator = sc; }
StreamCreator* current_stream_creator() const { return _current_call.stream_creator; }
// Make the RPC end when the HTTP response has complete headers and let // Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy(). // user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
...@@ -581,6 +586,7 @@ private: ...@@ -581,6 +586,7 @@ private:
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
// socket fetched from socket pool // socket fetched from socket pool
SocketUniquePtr sending_sock; SocketUniquePtr sending_sock;
StreamCreator* stream_creator;
}; };
void HandleStreamConnection(Socket *host_socket); void HandleStreamConnection(Socket *host_socket);
......
...@@ -337,6 +337,7 @@ H2StreamContext::H2StreamContext() ...@@ -337,6 +337,7 @@ H2StreamContext::H2StreamContext()
, _local_window_size(0) , _local_window_size(0)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
get_http2_bvars()->h2_stream_context_count << 1;
} }
H2StreamContext::~H2StreamContext() { H2StreamContext::~H2StreamContext() {
...@@ -344,6 +345,7 @@ H2StreamContext::~H2StreamContext() { ...@@ -344,6 +345,7 @@ H2StreamContext::~H2StreamContext() {
int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size; int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size;
_conn_ctx->ReclaimWindowSize(diff); _conn_ctx->ReclaimWindowSize(diff);
} }
get_http2_bvars()->h2_stream_context_count << -1;
} }
int H2Context::Init() { int H2Context::Init() {
...@@ -775,7 +777,7 @@ H2ParseResult H2StreamContext::OnResetStream( ...@@ -775,7 +777,7 @@ H2ParseResult H2StreamContext::OnResetStream(
} }
#endif #endif
H2StreamContext* sctx = _conn_ctx->RemoveStream(stream_id()); H2StreamContext* sctx = _conn_ctx->RemoveStream(stream_id());
if (sctx != NULL) { if (sctx == NULL) {
LOG(ERROR) << "Fail to find stream_id=" << stream_id(); LOG(ERROR) << "Fail to find stream_id=" << stream_id();
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
...@@ -1075,6 +1077,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id) ...@@ -1075,6 +1077,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
header()._h2_stream_id = stream_id; header()._h2_stream_id = stream_id;
get_http2_bvars()->h2_stream_context_count << 1;
} }
#ifdef HAS_H2_STREAM_STATE #ifdef HAS_H2_STREAM_STATE
...@@ -1321,6 +1324,7 @@ void H2UnsentRequest::Destroy() { ...@@ -1321,6 +1324,7 @@ void H2UnsentRequest::Destroy() {
} }
void H2UnsentRequest::OnCreatingStream(SocketUniquePtr*, Controller*) { void H2UnsentRequest::OnCreatingStream(SocketUniquePtr*, Controller*) {
CHECK(false) << "H2UnsentRequest::OnCreatingStream should not be called";
} }
void H2UnsentRequest::OnDestroyingStream( void H2UnsentRequest::OnDestroyingStream(
...@@ -1328,6 +1332,8 @@ void H2UnsentRequest::OnDestroyingStream( ...@@ -1328,6 +1332,8 @@ void H2UnsentRequest::OnDestroyingStream(
if (!end_of_rpc) { if (!end_of_rpc) {
return; return;
} }
// If cntl->ErrorCode == 0, then it is a normal response and stream has
// already been removed in EndRemoteStream.
if (sending_sock != NULL && cntl->ErrorCode() != 0) { if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(_cntl, cntl); CHECK_EQ(_cntl, cntl);
_mutex.lock(); _mutex.lock();
...@@ -1624,12 +1630,12 @@ void H2UnsentResponse::Describe(butil::IOBuf* desc) const { ...@@ -1624,12 +1630,12 @@ void H2UnsentResponse::Describe(butil::IOBuf* desc) const {
} }
void PackH2Request(butil::IOBuf*, void PackH2Request(butil::IOBuf*,
SocketMessage** user_message, SocketMessage** user_message,
uint64_t correlation_id, uint64_t correlation_id,
const google::protobuf::MethodDescriptor*, const google::protobuf::MethodDescriptor*,
Controller* cntl, Controller* cntl,
const butil::IOBuf&, const butil::IOBuf&,
const Authenticator* auth) { const Authenticator* auth) {
ControllerPrivateAccessor accessor(cntl); ControllerPrivateAccessor accessor(cntl);
HttpHeader* header = &cntl->http_request(); HttpHeader* header = &cntl->http_request();
...@@ -1643,13 +1649,11 @@ void PackH2Request(butil::IOBuf*, ...@@ -1643,13 +1649,11 @@ void PackH2Request(butil::IOBuf*,
// Serialize http2 request // Serialize http2 request
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl, correlation_id); H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl, correlation_id);
if (cntl->stream_creator() && h2_req->AddRefManually(); // add for OnDestroyingStream
cntl->stream_creator() != get_h2_global_stream_creator()) { if (cntl->current_stream_creator()) {
static_cast<H2UnsentRequest*>(cntl->stream_creator())->RemoveRefManually(); dynamic_cast<H2UnsentRequest*>(cntl->current_stream_creator())->RemoveRefManually();
} }
cntl->set_stream_creator(h2_req); cntl->set_current_stream_creator(h2_req);
h2_req->AddRefManually();
*user_message = h2_req; *user_message = h2_req;
if (FLAGS_http_verbose) { if (FLAGS_http_verbose) {
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "brpc/details/hpack.h" #include "brpc/details/hpack.h"
#include "brpc/stream_creator.h" #include "brpc/stream_creator.h"
#include "brpc/controller.h" #include "brpc/controller.h"
#include "bvar/bvar.h"
namespace brpc { namespace brpc {
...@@ -74,6 +75,19 @@ enum H2StreamState { ...@@ -74,6 +75,19 @@ enum H2StreamState {
}; };
const char* H2StreamState2Str(H2StreamState); const char* H2StreamState2Str(H2StreamState);
struct Http2Bvars {
bvar::Adder<int> h2_unsent_request_count;
bvar::Adder<int> h2_stream_context_count;
Http2Bvars()
: h2_unsent_request_count("h2_unsent_request_count")
, h2_stream_context_count("h2_stream_context_count") {
}
};
inline Http2Bvars* get_http2_bvars() {
return butil::get_leaky_singleton<Http2Bvars>();
}
class H2UnsentRequest : public SocketMessage, public StreamCreator { class H2UnsentRequest : public SocketMessage, public StreamCreator {
public: public:
static H2UnsentRequest* New(Controller* c, uint64_t correlation_id); static H2UnsentRequest* New(Controller* c, uint64_t correlation_id);
...@@ -108,8 +122,12 @@ private: ...@@ -108,8 +122,12 @@ private:
: _nref(1) : _nref(1)
, _size(0) , _size(0)
, _stream_id(0) , _stream_id(0)
, _cntl(c) {} , _cntl(c) {
~H2UnsentRequest() {} get_http2_bvars()->h2_unsent_request_count << 1;
}
~H2UnsentRequest() {
get_http2_bvars()->h2_unsent_request_count << -1;
}
H2UnsentRequest(const H2UnsentRequest&); H2UnsentRequest(const H2UnsentRequest&);
void operator=(const H2UnsentRequest&); void operator=(const H2UnsentRequest&);
void Destroy(); void Destroy();
......
...@@ -2134,7 +2134,10 @@ void RtmpClientStream::Init(const RtmpClient* client, ...@@ -2134,7 +2134,10 @@ void RtmpClientStream::Init(const RtmpClient* client,
_options = options; _options = options;
OnClientStreamCreated* done = new OnClientStreamCreated; OnClientStreamCreated* done = new OnClientStreamCreated;
done->stream.reset(this); done->stream.reset(this);
// In RTMP, stream_creator and current stream_creator is always
// this RtmpClientStream.
done->cntl.set_stream_creator(this); done->cntl.set_stream_creator(this);
done->cntl.set_current_stream_creator(this);
done->cntl.set_connection_type(_options.share_connection ? done->cntl.set_connection_type(_options.share_connection ?
CONNECTION_TYPE_SINGLE : CONNECTION_TYPE_SINGLE :
CONNECTION_TYPE_SHORT); CONNECTION_TYPE_SHORT);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment