Commit f12bd910 authored by zhujiashun's avatar zhujiashun Committed by gejun

- Implement H2GlobalStreamCreator::ReplaceSocketForStream

- Set stream creator in serialize_request
parent 35cd7b1c
...@@ -96,6 +96,7 @@ public: ...@@ -96,6 +96,7 @@ public:
_cntl->_request_protocol = protocol; _cntl->_request_protocol = protocol;
return *this; return *this;
} }
ProtocolType request_protocol() { return _cntl->_request_protocol; }
Span* span() const { return _cntl->_span; } Span* span() const { return _cntl->_span; }
......
...@@ -169,6 +169,7 @@ public: ...@@ -169,6 +169,7 @@ public:
void Destroy() { delete this; } void Destroy() { delete this; }
int AllocateClientStreamId(); int AllocateClientStreamId();
bool RunOutStreams();
// Try to map stream_id to ctx if stream_id does not exist before // Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise. // Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx); bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
...@@ -317,8 +318,7 @@ int H2Context::Init() { ...@@ -317,8 +318,7 @@ int H2Context::Init() {
} }
inline int H2Context::AllocateClientStreamId() { inline int H2Context::AllocateClientStreamId() {
if (_last_client_stream_id > 0x7FFFFFFF) { if (RunOutStreams()) {
// run out stream id
return -1; return -1;
} }
const int id = _last_client_stream_id; const int id = _last_client_stream_id;
...@@ -326,6 +326,14 @@ inline int H2Context::AllocateClientStreamId() { ...@@ -326,6 +326,14 @@ inline int H2Context::AllocateClientStreamId() {
return id; return id;
} }
inline bool H2Context::RunOutStreams() {
if (_last_client_stream_id > 0x7FFFFFFF) {
// run out stream id
return true;
}
return false;
}
H2StreamContext* H2Context::RemoveStream(int stream_id) { H2StreamContext* H2Context::RemoveStream(int stream_id) {
H2StreamContext* sctx = NULL; H2StreamContext* sctx = NULL;
std::unique_lock<butil::Mutex> mu(_stream_mutex); std::unique_lock<butil::Mutex> mu(_stream_mutex);
...@@ -1518,16 +1526,42 @@ void PackH2Request(butil::IOBuf*, ...@@ -1518,16 +1526,42 @@ void PackH2Request(butil::IOBuf*,
} }
} }
class H2GlobalStreamCreator : public StreamCreator {
protected:
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller* cntl,
int error_code);
};
void H2GlobalStreamCreator::ReplaceSocketForStream( void H2GlobalStreamCreator::ReplaceSocketForStream(
SocketUniquePtr*, Controller*) { SocketUniquePtr* inout, Controller* cntl) {
std::unique_lock<butil::Mutex> mu(_mutex);
do {
if (!(*inout)->_agent_socket) {
break;
}
H2Context* ctx = static_cast<H2Context*>((*inout)->_agent_socket->parsing_context());
if (ctx == NULL) {
break;
}
if (ctx->RunOutStreams()) {
break;
}
(*inout)->_agent_socket->ReAddress(inout);
return;
} while (0);
LOG(INFO) << "Ready to create h2 agent socket";
SocketId sid;
SocketOptions opt = (*inout)->_options;
// Only main socket can be the owner of ssl_ctx
opt.owns_ssl_ctx = false;
opt.health_check_interval_s = -1;
if (get_client_side_messenger()->Create(opt, &sid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create H2 socket");
return;
}
SocketUniquePtr tmp_ptr;
if (Socket::Address(sid, &tmp_ptr) != 0) {
cntl->SetFailed(EFAILEDSOCKET, "Fail to address H2 socketId=%" PRIu64, sid);
return;
}
(*inout)->_agent_socket.swap(tmp_ptr);
(*inout)->_agent_socket->ReAddress(inout);
return;
} }
void H2GlobalStreamCreator::OnStreamCreationDone( void H2GlobalStreamCreator::OnStreamCreationDone(
...@@ -1537,7 +1571,7 @@ void H2GlobalStreamCreator::OnStreamCreationDone( ...@@ -1537,7 +1571,7 @@ void H2GlobalStreamCreator::OnStreamCreationDone(
void H2GlobalStreamCreator::CleanupSocketForStream( void H2GlobalStreamCreator::CleanupSocketForStream(
Socket* prev_sock, Controller* cntl, int error_code) { Socket* prev_sock, Controller* cntl, int error_code) {
CHECK(false) << "Never run";
} }
StreamCreator* get_h2_global_stream_creator() { StreamCreator* get_h2_global_stream_creator() {
......
...@@ -211,6 +211,16 @@ void PackH2Request(butil::IOBuf* buf, ...@@ -211,6 +211,16 @@ void PackH2Request(butil::IOBuf* buf,
const butil::IOBuf& request, const butil::IOBuf& request,
const Authenticator* auth); const Authenticator* auth);
class H2GlobalStreamCreator : public StreamCreator {
protected:
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller* cntl,
int error_code);
private:
butil::Mutex _mutex;
};
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
......
...@@ -466,6 +466,12 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -466,6 +466,12 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
header->SetHeader(common->CONNECTION, common->KEEP_ALIVE); header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
} }
if (accessor.request_protocol() == PROTOCOL_HTTP2) {
cntl->set_stream_creator(get_h2_global_stream_creator());
} else {
LOG(INFO) << "in SerializeHttpRequest, is_http2=0";
}
// Set url to /ServiceName/MethodName when we're about to call protobuf // Set url to /ServiceName/MethodName when we're about to call protobuf
// services (indicated by non-NULL method). // services (indicated by non-NULL method).
const google::protobuf::MethodDescriptor* method = cntl->method(); const google::protobuf::MethodDescriptor* method = cntl->method();
......
...@@ -1061,6 +1061,8 @@ void Socket::OnRecycle() { ...@@ -1061,6 +1061,8 @@ void Socket::OnRecycle() {
delete _stream_set; delete _stream_set;
_stream_set = NULL; _stream_set = NULL;
_agent_socket.reset(NULL);
s_vars->nsocket << -1; s_vars->nsocket << -1;
} }
......
...@@ -42,6 +42,7 @@ namespace brpc { ...@@ -42,6 +42,7 @@ namespace brpc {
namespace policy { namespace policy {
class ConsistentHashingLoadBalancer; class ConsistentHashingLoadBalancer;
class RtmpContext; class RtmpContext;
class H2GlobalStreamCreator;
} // namespace policy } // namespace policy
namespace schan { namespace schan {
class ChannelBalancer; class ChannelBalancer;
...@@ -185,6 +186,7 @@ friend class policy::ConsistentHashingLoadBalancer; ...@@ -185,6 +186,7 @@ friend class policy::ConsistentHashingLoadBalancer;
friend class policy::RtmpContext; friend class policy::RtmpContext;
friend class schan::ChannelBalancer; friend class schan::ChannelBalancer;
friend class HealthCheckTask; friend class HealthCheckTask;
friend class policy::H2GlobalStreamCreator;
class SharedPart; class SharedPart;
struct Forbidden {}; struct Forbidden {};
struct WriteRequest; struct WriteRequest;
...@@ -759,6 +761,8 @@ private: ...@@ -759,6 +761,8 @@ private:
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set; std::set<StreamId> *_stream_set;
SocketUniquePtr _agent_socket;
}; };
} // namespace brpc } // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment