Commit 5293d5ae authored by zhujiashun's avatar zhujiashun Committed by gejun

add max_concurrent_streams check

parent 9ec71f1d
......@@ -176,6 +176,7 @@ public:
// Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
uint32_t StreamSize();
HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; }
......@@ -404,6 +405,11 @@ bool H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) {
return false;
}
uint32_t H2Context::StreamSize() {
std::unique_lock<butil::Mutex> mu(_stream_mutex);
return _pending_streams.size();
}
const size_t FRAME_HEAD_SIZE = 9;
ParseResult H2Context::ConsumeFrameHead(
......@@ -910,9 +916,6 @@ H2ParseResult H2Context::OnWindowUpdate(
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
LOG_EVERY_SECOND(INFO) << "Receive OnWindowUpdate, stream_id=" << frame_head.stream_id
<< ", inc=" << inc << ", _remote_conn_window_size="
<< _remote_conn_window_size;
if (frame_head.stream_id == 0) {
if (!add_window_size(&_remote_conn_window_size, inc)) {
......@@ -1380,6 +1383,11 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
return butil::Status(ECANCELED, "The RPC was already failed");
}
// TODO(zhujiashun): also check this in server push
if (ctx->StreamSize() > ctx->remote_settings().max_concurrent_streams) {
return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream");
}
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
const int id = ctx->AllocateClientStreamId();
......@@ -1390,7 +1398,6 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}
H2StreamContext* sctx = _sctx.release();
sctx->Init(ctx, id);
if (!ctx->TryToInsertStream(id, sctx)) {
delete sctx;
return butil::Status(ECANCELED, "stream_id already exists");
......@@ -1419,7 +1426,6 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
// flow control
int64_t s_win = sctx->_remote_window_size.load(butil::memory_order_relaxed);
int64_t c_win = ctx->_remote_conn_window_size.load(butil::memory_order_relaxed);
LOG_EVERY_SECOND(INFO) << "s_win=" << s_win << ", c_win=" << c_win;
const int64_t sz = _cntl->request_attachment().size();
if (sz > s_win || sz > c_win) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
......
......@@ -936,7 +936,6 @@ TEST_F(HttpTest, http2_sanity) {
test::EchoRequest req;
req.set_message(EXP_REQUEST);
test::EchoResponse res;
int log_duration = 10000;
for (int i = 0; i < 200000; ++i) {
brpc::Controller cntl;
cntl.http_request().set_content_type("application/json");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment