Commit 30284e4f authored by gejun's avatar gejun

Simplify H2GlobalStreamCreator::OnCreatingStream & Not fail socket when running…

Simplify H2GlobalStreamCreator::OnCreatingStream & Not fail socket when running out of streams & misc changes
parent 007a8678
...@@ -846,8 +846,11 @@ ssize_t HPacker::Decode(butil::IOBufBytesIterator& iter, Header* h) { ...@@ -846,8 +846,11 @@ ssize_t HPacker::Decode(butil::IOBufBytesIterator& iter, Header* h) {
} }
void HPacker::Describe(std::ostream& os, const DescribeOptions& opt) const { void HPacker::Describe(std::ostream& os, const DescribeOptions& opt) const {
if (opt.verbose) {
os << '\n';
}
const char sep = (opt.verbose ? '\n' : ' '); const char sep = (opt.verbose ? '\n' : ' ');
os << (opt.verbose ? sep : '{') << "encode_table="; os << "encode_table=";
if (_encode_table) { if (_encode_table) {
_encode_table->Print(os); _encode_table->Print(os);
} else { } else {
...@@ -859,8 +862,8 @@ void HPacker::Describe(std::ostream& os, const DescribeOptions& opt) const { ...@@ -859,8 +862,8 @@ void HPacker::Describe(std::ostream& os, const DescribeOptions& opt) const {
} else { } else {
os << "null"; os << "null";
} }
if (!opt.verbose) { if (opt.verbose) {
os << '}'; os << '\n';
} }
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "brpc/details/controller_private_accessor.h" #include "brpc/details/controller_private_accessor.h"
#include "brpc/server.h" #include "brpc/server.h"
#include "butil/base64.h" #include "butil/base64.h"
#include "brpc/log.h"
namespace brpc { namespace brpc {
...@@ -176,7 +177,7 @@ public: ...@@ -176,7 +177,7 @@ public:
void Destroy() { delete this; } void Destroy() { delete this; }
int AllocateClientStreamId(); int AllocateClientStreamId();
bool RunOutStreams(); bool RunOutStreams() const;
// Try to map stream_id to ctx if stream_id does not exist before // Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise. // Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx); bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
...@@ -236,7 +237,6 @@ friend void InitFrameHandlers(); ...@@ -236,7 +237,6 @@ friend void InitFrameHandlers();
typedef butil::FlatMap<int, H2StreamContext*> StreamMap; typedef butil::FlatMap<int, H2StreamContext*> StreamMap;
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
StreamMap _pending_streams; StreamMap _pending_streams;
butil::Mutex _conn_window_mutex;;
butil::atomic<int64_t> _pending_conn_window_size; butil::atomic<int64_t> _pending_conn_window_size;
}; };
...@@ -337,7 +337,9 @@ H2StreamContext::H2StreamContext() ...@@ -337,7 +337,9 @@ H2StreamContext::H2StreamContext()
, _local_window_size(0) , _local_window_size(0)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << 1; get_http2_bvars()->h2_stream_context_count << 1;
#endif
} }
H2StreamContext::~H2StreamContext() { H2StreamContext::~H2StreamContext() {
...@@ -345,7 +347,9 @@ H2StreamContext::~H2StreamContext() { ...@@ -345,7 +347,9 @@ H2StreamContext::~H2StreamContext() {
int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size; int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size;
_conn_ctx->ReclaimWindowSize(diff); _conn_ctx->ReclaimWindowSize(diff);
} }
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << -1; get_http2_bvars()->h2_stream_context_count << -1;
#endif
} }
int H2Context::Init() { int H2Context::Init() {
...@@ -369,12 +373,8 @@ inline int H2Context::AllocateClientStreamId() { ...@@ -369,12 +373,8 @@ inline int H2Context::AllocateClientStreamId() {
return id; return id;
} }
inline bool H2Context::RunOutStreams() { inline bool H2Context::RunOutStreams() const {
if (_last_client_stream_id > 0x7FFFFFFF) { return (_last_client_stream_id > 0x7FFFFFFF);
// run out stream id
return true;
}
return false;
} }
H2StreamContext* H2Context::RemoveStream(int stream_id) { H2StreamContext* H2Context::RemoveStream(int stream_id) {
...@@ -945,10 +945,12 @@ H2ParseResult H2Context::OnWindowUpdate( ...@@ -945,10 +945,12 @@ H2ParseResult H2Context::OnWindowUpdate(
} }
void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const { void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
if (opt.verbose) {
os << '\n';
}
const char sep = (opt.verbose ? '\n' : ' '); const char sep = (opt.verbose ? '\n' : ' ');
os << (opt.verbose ? sep : '{') os << "remote_conn_window_size=" << _remote_conn_window_size
<< "remote_conn_window_size=" << _remote_conn_window_size << sep << "conn_state=" << H2ConnectionState2Str(_conn_state);
<< sep << "state=" << H2ConnectionState2Str(_conn_state);
if (is_server_side()) { if (is_server_side()) {
os << sep << "last_server_stream_id=" << _last_server_stream_id; os << sep << "last_server_stream_id=" << _last_server_stream_id;
} else { } else {
...@@ -956,11 +958,12 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const { ...@@ -956,11 +958,12 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
} }
os << sep << "remote_settings=" << _remote_settings os << sep << "remote_settings=" << _remote_settings
<< sep << "local_settings=" << _local_settings << sep << "local_settings=" << _local_settings
<< sep << "hpacker="; << sep << "hpacker={";
IndentingOStream os2(os, 2); IndentingOStream os2(os, 2);
_hpacker.Describe(os2, opt); _hpacker.Describe(os2, opt);
if (!opt.verbose) { os << '}';
os << '}'; if (opt.verbose) {
os << '\n';
} }
} }
...@@ -1075,7 +1078,9 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id) ...@@ -1075,7 +1078,9 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
header()._h2_stream_id = stream_id; header()._h2_stream_id = stream_id;
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << 1; get_http2_bvars()->h2_stream_context_count << 1;
#endif
} }
#ifdef HAS_H2_STREAM_STATE #ifdef HAS_H2_STREAM_STATE
...@@ -1363,8 +1368,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1363,8 +1368,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
ctx = new H2Context(socket, NULL); ctx = new H2Context(socket, NULL);
if (ctx->Init() != 0) { if (ctx->Init() != 0) {
delete ctx; delete ctx;
socket->SetFailed(EFAILEDSOCKET, "Fail to init H2Context"); return butil::Status(EINVAL, "Fail to init H2Context");
return butil::Status(EFAILEDSOCKET, "Fail to init H2Context");
} }
socket->initialize_parsing_context(&ctx); socket->initialize_parsing_context(&ctx);
...@@ -1376,6 +1380,8 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1376,6 +1380,8 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
out->append(headbuf, FRAME_HEAD_SIZE); out->append(headbuf, FRAME_HEAD_SIZE);
} }
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
if (_cntl == NULL) { if (_cntl == NULL) {
return butil::Status(ECANCELED, "The RPC was already failed"); return butil::Status(ECANCELED, "The RPC was already failed");
...@@ -1386,13 +1392,14 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1386,13 +1392,14 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream"); return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream");
} }
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
const int id = ctx->AllocateClientStreamId(); const int id = ctx->AllocateClientStreamId();
if (id < 0) { if (id < 0) {
// OK to fail in http2, choose a retryable errno // The RPC should be failed and retried.
socket->SetFailed(EFAILEDSOCKET, "Fail to create http2 stream"); // Note that the socket should not be SetFailed() which may affect
return butil::Status(EFAILEDSOCKET, "Fail to create http2 stream"); // other RPC successfully sent requests and waiting for responses.
RPC_VLOG << "Fail to allocate stream_id on socket=" << *socket
<< " h2req=" << (StreamUserData*)this;
return butil::Status(EH2RUNOUTSTREAMS, "Fail to allocate stream_id");
} }
H2StreamContext* sctx = _sctx.release(); H2StreamContext* sctx = _sctx.release();
sctx->Init(ctx, id); sctx->Init(ctx, id);
...@@ -1648,36 +1655,16 @@ void PackH2Request(butil::IOBuf*, ...@@ -1648,36 +1655,16 @@ void PackH2Request(butil::IOBuf*,
} }
} }
static bool IsH2SocketValid(Socket* s) {
H2Context* c = static_cast<H2Context*>(s->parsing_context());
return (c == NULL || !c->RunOutStreams());
}
StreamUserData* H2GlobalStreamCreator::OnCreatingStream( StreamUserData* H2GlobalStreamCreator::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) { SocketUniquePtr* inout, Controller* cntl) {
std::unique_lock<butil::Mutex> mu(_mutex); if ((*inout)->GetAgentSocket(inout, IsH2SocketValid) != 0) {
SocketUniquePtr& agent_sock = (*inout)->_agent_socket; cntl->SetFailed(EINTERNAL, "Fail to create agent socket");
if (!agent_sock || agent_sock->Failed() || return NULL;
(agent_sock->parsing_context() &&
static_cast<H2Context*>(agent_sock->parsing_context())->RunOutStreams())) {
// Create a new agent socket
SocketId sid;
SocketOptions opt = (*inout)->_options;
opt.health_check_interval_s = -1;
// TODO(zhujiashun): Predictively create socket to improve performance
if (get_client_side_messenger()->Create(opt, &sid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create H2 socket");
return NULL;
}
SocketUniquePtr tmp_ptr;
if (Socket::Address(sid, &tmp_ptr) != 0) {
cntl->SetFailed(EFAILEDSOCKET, "Fail to address H2 socketId=%" PRIu64, sid);
return NULL;
}
tmp_ptr->ShareStats(inout->get());
(*inout)->_agent_socket.swap(tmp_ptr);
mu.unlock();
(*inout)->_agent_socket->ReAddress(inout);
if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference();
}
} else {
agent_sock->ReAddress(inout);
} }
H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl); H2UnsentRequest* h2_req = H2UnsentRequest::New(cntl);
......
...@@ -75,6 +75,7 @@ enum H2StreamState { ...@@ -75,6 +75,7 @@ enum H2StreamState {
}; };
const char* H2StreamState2Str(H2StreamState); const char* H2StreamState2Str(H2StreamState);
#ifndef NDEBUG
struct Http2Bvars { struct Http2Bvars {
bvar::Adder<int> h2_unsent_request_count; bvar::Adder<int> h2_unsent_request_count;
bvar::Adder<int> h2_stream_context_count; bvar::Adder<int> h2_stream_context_count;
...@@ -87,6 +88,7 @@ struct Http2Bvars { ...@@ -87,6 +88,7 @@ struct Http2Bvars {
inline Http2Bvars* get_http2_bvars() { inline Http2Bvars* get_http2_bvars() {
return butil::get_leaky_singleton<Http2Bvars>(); return butil::get_leaky_singleton<Http2Bvars>();
} }
#endif
class H2UnsentRequest : public SocketMessage, public StreamUserData { class H2UnsentRequest : public SocketMessage, public StreamUserData {
friend void PackH2Request(butil::IOBuf*, SocketMessage**, friend void PackH2Request(butil::IOBuf*, SocketMessage**,
...@@ -128,10 +130,14 @@ private: ...@@ -128,10 +130,14 @@ private:
, _size(0) , _size(0)
, _stream_id(0) , _stream_id(0)
, _cntl(c) { , _cntl(c) {
#ifndef NDEBUG
get_http2_bvars()->h2_unsent_request_count << 1; get_http2_bvars()->h2_unsent_request_count << 1;
#endif
} }
~H2UnsentRequest() { ~H2UnsentRequest() {
#ifndef NDEBUG
get_http2_bvars()->h2_unsent_request_count << -1; get_http2_bvars()->h2_unsent_request_count << -1;
#endif
} }
H2UnsentRequest(const H2UnsentRequest&); H2UnsentRequest(const H2UnsentRequest&);
void operator=(const H2UnsentRequest&); void operator=(const H2UnsentRequest&);
...@@ -241,8 +247,6 @@ class H2GlobalStreamCreator : public StreamCreator { ...@@ -241,8 +247,6 @@ class H2GlobalStreamCreator : public StreamCreator {
protected: protected:
StreamUserData* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override; StreamUserData* OnCreatingStream(SocketUniquePtr* inout, Controller* cntl) override;
void DestroyStreamCreator(Controller* cntl) override; void DestroyStreamCreator(Controller* cntl) override;
private:
butil::Mutex _mutex;
}; };
} // namespace policy } // namespace policy
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment