Commit b307a42c authored by zhujiashun's avatar zhujiashun

ignore flow control in h2 when sending first request

parent f43bdcec
...@@ -331,6 +331,7 @@ H2Context::H2Context(Socket* socket, const Server* server) ...@@ -331,6 +331,7 @@ H2Context::H2Context(Socket* socket, const Server* server)
, _last_received_stream_id(-1) , _last_received_stream_id(-1)
, _last_sent_stream_id(1) , _last_sent_stream_id(1)
, _goaway_stream_id(-1) , _goaway_stream_id(-1)
, _remote_settings_received(false)
, _deferred_window_update(0) { , _deferred_window_update(0) {
// Stop printing the field which is useless for remote settings. // Stop printing the field which is useless for remote settings.
_remote_settings.connection_window_size = 0; _remote_settings.connection_window_size = 0;
...@@ -887,6 +888,7 @@ H2ParseResult H2Context::OnSettings( ...@@ -887,6 +888,7 @@ H2ParseResult H2Context::OnSettings(
LOG(WARNING) << "Fail to respond settings with ack to " << *_socket; LOG(WARNING) << "Fail to respond settings with ack to " << *_socket;
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
_remote_settings_received.store(true, butil::memory_order_release);
return MakeH2Message(NULL); return MakeH2Message(NULL);
} }
...@@ -1017,14 +1019,16 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const { ...@@ -1017,14 +1019,16 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
os << '\n'; os << '\n';
} }
const char sep = (opt.verbose ? '\n' : ' '); const char sep = (opt.verbose ? '\n' : ' ');
os << "conn_state=" << H2ConnectionState2Str(_conn_state); os << "conn_state=" << H2ConnectionState2Str(_conn_state)
os << sep << "last_received_stream_id=" << _last_received_stream_id << sep << "last_received_stream_id=" << _last_received_stream_id
<< sep << "last_sent_stream_id=" << _last_sent_stream_id; << sep << "last_sent_stream_id=" << _last_sent_stream_id
os << sep << "deferred_window_update=" << sep << "deferred_window_update="
<< _deferred_window_update.load(butil::memory_order_relaxed) << _deferred_window_update.load(butil::memory_order_relaxed)
<< sep << "remote_conn_window_left=" << sep << "remote_conn_window_left="
<< _remote_window_left.load(butil::memory_order_relaxed) << _remote_window_left.load(butil::memory_order_relaxed)
<< sep << "remote_settings=" << _remote_settings << sep << "remote_settings=" << _remote_settings
<< sep << "remote_settings_received="
<< _remote_settings_received.load(butil::memory_order_relaxed)
<< sep << "local_settings=" << _local_settings << sep << "local_settings=" << _local_settings
<< sep << "hpacker={"; << sep << "hpacker={";
IndentingOStream os2(os, 2); IndentingOStream os2(os, 2);
...@@ -1527,8 +1531,8 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1527,8 +1531,8 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
} }
_sctx->Init(ctx, id); _sctx->Init(ctx, id);
// flow control // check flow control restriction only when remote setting is received.
if (!_cntl->request_attachment().empty()) { if (!_cntl->request_attachment().empty() && ctx->is_remote_settings_received()) {
const int64_t data_size = _cntl->request_attachment().size(); const int64_t data_size = _cntl->request_attachment().size();
if (!_sctx->ConsumeWindowSize(data_size)) { if (!_sctx->ConsumeWindowSize(data_size)) {
return butil::Status(ELIMIT, "remote_window_left is not enough, data_size=%" PRId64, data_size); return butil::Status(ELIMIT, "remote_window_left is not enough, data_size=%" PRId64, data_size);
......
...@@ -353,6 +353,9 @@ public: ...@@ -353,6 +353,9 @@ public:
void DeferWindowUpdate(int64_t); void DeferWindowUpdate(int64_t);
int64_t ReleaseDeferredWindowUpdate(); int64_t ReleaseDeferredWindowUpdate();
bool is_remote_settings_received() const
{ return _remote_settings_received.load(butil::memory_order_acquire); }
private: private:
friend class H2StreamContext; friend class H2StreamContext;
friend class H2UnsentRequest; friend class H2UnsentRequest;
...@@ -387,6 +390,7 @@ friend void InitFrameHandlers(); ...@@ -387,6 +390,7 @@ friend void InitFrameHandlers();
uint32_t _last_sent_stream_id; uint32_t _last_sent_stream_id;
int _goaway_stream_id; int _goaway_stream_id;
H2Settings _remote_settings; H2Settings _remote_settings;
butil::atomic<bool> _remote_settings_received;
H2Settings _local_settings; H2Settings _local_settings;
H2Settings _unack_local_settings; H2Settings _unack_local_settings;
HPacker _hpacker; HPacker _hpacker;
......
...@@ -96,8 +96,6 @@ public: ...@@ -96,8 +96,6 @@ public:
if (sleep_ms_str) { if (sleep_ms_str) {
bthread_usleep(strtol(sleep_ms_str->data(), NULL, 10) * 1000); bthread_usleep(strtol(sleep_ms_str->data(), NULL, 10) * 1000);
} }
EXPECT_EQ(EXP_REQUEST, req->message());
res->set_message(EXP_RESPONSE); res->set_message(EXP_RESPONSE);
} }
}; };
...@@ -996,12 +994,24 @@ TEST_F(HttpTest, http2_sanity) { ...@@ -996,12 +994,24 @@ TEST_F(HttpTest, http2_sanity) {
options.protocol = "h2"; options.protocol = "h2";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options)); ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
// 1) complete flow and // Check that the first request with size larger than the default window can
// 2) socket replacement when streamId runs out, the initial streamId is a special // be sent out, when remote settings are not received.
// value set in ctor of H2Context brpc::Controller cntl;
test::EchoRequest big_req;
test::EchoResponse res;
std::string message(2 * 1024 * 1024 /* 2M */, 'x');
big_req.set_message(message);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &big_req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
// socket replacement when streamId runs out, the initial streamId is a special
// value set in ctor of H2Context so that the number 15000 is enough to run out
// of stream.
test::EchoRequest req; test::EchoRequest req;
req.set_message(EXP_REQUEST); req.set_message(EXP_REQUEST);
test::EchoResponse res;
for (int i = 0; i < 15000; ++i) { for (int i = 0; i < 15000; ++i) {
brpc::Controller cntl; brpc::Controller cntl;
cntl.http_request().set_content_type("application/json"); cntl.http_request().set_content_type("application/json");
...@@ -1113,6 +1123,14 @@ TEST_F(HttpTest, http2_window_used_up) { ...@@ -1113,6 +1123,14 @@ TEST_F(HttpTest, http2_window_used_up) {
cntl.http_request().set_content_type("application/proto"); cntl.http_request().set_content_type("application/proto");
brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req); brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
brpc::H2Settings h2_settings;
const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
brpc::policy::ParseH2Message(&buf, _h2_client_sock.get(), false, NULL);
int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size(); int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size();
for (int i = 0; i <= nsuc; i++) { for (int i = 0; i <= nsuc; i++) {
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl); brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment