Commit 3bc173cb authored by zhujiashun's avatar zhujiashun

add flow control check in data receiver

parent f2eb9cb4
......@@ -470,7 +470,7 @@ ParseResult H2Context::Consume(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) {
LOG(WARNING) << "Fail to respond http2-client with settings to " << *socket
LOG(WARNING) << "Fail to respond http2-client with settings to " << *socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
} else {
......@@ -696,17 +696,24 @@ H2ParseResult H2Context::OnData(
frag_size -= pad_length;
H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
if (is_client_side()) {
RPC_VLOG << "Fail to find stream_id=" << frame_head.stream_id;
// If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state,
// the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(this, frame_head.stream_id);
tmp_sctx.OnData(it, frame_head, frag_size, pad_length);
DeferWindowUpdate(tmp_sctx.ReleaseDeferredWindowUpdate());
return MakeH2Message(NULL);
if (is_client_side()) {
RPC_VLOG << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_NO_ERROR, frame_head.stream_id);
} else {
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
return MakeH2Error(H2_STREAM_CLOSED_ERROR, frame_head.stream_id);
}
}
if (_deferred_window_update.load(butil::memory_order_relaxed) + frag_size >
local_settings().connection_window_size) {
LOG(ERROR) << "Fail to satisfy the connection-level flow control policy";
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
return sctx->OnData(it, frame_head, frag_size, pad_length);
}
......@@ -725,8 +732,12 @@ H2ParseResult H2StreamContext::OnData(
return MakeH2Error(H2_PROTOCOL_ERROR);
}
}
const int64_t acc = _deferred_window_update.fetch_add(frag_size, butil::memory_order_relaxed) + frag_size;
if (acc >= _conn_ctx->local_settings().stream_window_size / 2) {
if (acc > _conn_ctx->local_settings().stream_window_size) {
LOG(ERROR) << "Fail to satisfy the stream-level flow control policy";
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
} else if (acc >= _conn_ctx->local_settings().stream_window_size / 2) {
// Rarely happen for small messages.
const int64_t stream_wu =
_deferred_window_update.exchange(0, butil::memory_order_relaxed);
......
......@@ -22,7 +22,7 @@ DEFINE_bool(loop, false, "run until ctrl-C is pressed");
DEFINE_bool(use_futex, false, "use futex instead of pipe");
DEFINE_bool(use_butex, false, "use butex instead of pipe");
void (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN);
void ALLOW_UNUSED (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN);
volatile bool stop = false;
void quit_handler(int) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment