Unverified Commit 57d9554c authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #515 from brpc/http2

Add more info to h2 log
parents fd45ffc1 45a1aaaa
......@@ -428,7 +428,7 @@ ParseResult H2Context::ConsumeFrameHead(
const uint32_t length = ((uint32_t)length_buf[0] << 16)
| ((uint32_t)length_buf[1] << 8) | length_buf[2];
if (length > _local_settings.max_frame_size) {
LOG(ERROR) << "Too large length=" << length << " max="
LOG(ERROR) << "Too large frame length=" << length << " max="
<< _local_settings.max_frame_size;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
......@@ -470,7 +470,7 @@ ParseResult H2Context::Consume(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) {
LOG(WARNING) << "Fail to respond http2-client with settings";
LOG(WARNING) << "Fail to respond http2-client with settings to " << *socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
} else {
......@@ -502,9 +502,21 @@ ParseResult H2Context::Consume(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send RST_STREAM";
LOG(WARNING) << "Fail to send RST_STREAM to " << *_socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
H2StreamContext* sctx = RemoveStream(h2_res.stream_id());
if (sctx) {
DeferWindowUpdate(sctx->ReleaseDeferredWindowUpdate());
if (is_server_side()) {
delete sctx;
return MakeMessage(NULL);
} else {
sctx->header().set_status_code(
H2ErrorToStatusCode(h2_res.error()));
return MakeMessage(sctx);
}
}
return MakeMessage(NULL);
} else { // send GOAWAY
char goawaybuf[FRAME_HEAD_SIZE + 4];
......@@ -516,7 +528,7 @@ ParseResult H2Context::Consume(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send GOAWAY";
LOG(WARNING) << "Fail to send GOAWAY to " << *_socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
return MakeMessage(NULL);
......@@ -570,6 +582,11 @@ H2ParseResult H2Context::OnHeaders(
LOG(ERROR) << "Invalid stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
if (VolatilePendingStreamSize() >= local_settings().max_concurrent_streams) {
LOG(ERROR) << "Reached max concurrent stream="
<< local_settings().max_concurrent_streams;
return MakeH2Error(H2_REFUSED_STREAM);
}
_last_server_stream_id = frame_head.stream_id;
sctx = new H2StreamContext(_socket->is_read_progressive());
sctx->Init(this, frame_head.stream_id);
......@@ -610,7 +627,8 @@ H2ParseResult H2StreamContext::OnHeaders(
#endif
butil::IOBufBytesIterator it2(it, frag_size);
if (ConsumeHeaders(it2) < 0) {
LOG(ERROR) << "Invalid header, frag_size=" << frag_size;
LOG(ERROR) << "Invalid header, frag_size=" << frag_size
<< ", stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
const size_t nskip = frag_size - it2.bytes_left();
......@@ -622,7 +640,8 @@ H2ParseResult H2StreamContext::OnHeaders(
it.forward(pad_length);
if (frame_head.flags & H2_FLAGS_END_HEADERS) {
if (it2.bytes_left() != 0) {
LOG(ERROR) << "Incomplete header";
LOG(ERROR) << "Incomplete header: payload_size=" << frame_head.payload_size
<< ", stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
if (frame_head.flags & H2_FLAGS_END_STREAM) {
......@@ -664,13 +683,15 @@ H2ParseResult H2StreamContext::OnContinuation(
const size_t size = _remaining_header_fragment.size();
butil::IOBufBytesIterator it2(_remaining_header_fragment);
if (ConsumeHeaders(it2) < 0) {
LOG(ERROR) << "Invalid header";
LOG(ERROR) << "Invalid header: payload_size=" << frame_head.payload_size
<< ", stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
_remaining_header_fragment.pop_front(size - it2.bytes_left());
if (frame_head.flags & H2_FLAGS_END_HEADERS) {
if (it2.bytes_left() != 0) {
LOG(ERROR) << "Incomplete header";
LOG(ERROR) << "Incomplete header: payload_size=" << frame_head.payload_size
<< ", stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
if (_stream_ended) {
......@@ -695,18 +716,15 @@ H2ParseResult H2Context::OnData(
frag_size -= pad_length;
H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
if (is_client_side()) {
RPC_VLOG << "Fail to find stream_id=" << frame_head.stream_id;
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(false);
tmp_sctx.Init(this, frame_head.stream_id);
tmp_sctx.OnData(it, frame_head, frag_size, pad_length);
DeferWindowUpdate(tmp_sctx.ReleaseDeferredWindowUpdate());
return MakeH2Message(NULL);
} else {
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
// If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state,
// the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(this, frame_head.stream_id);
tmp_sctx.OnData(it, frame_head, frag_size, pad_length);
DeferWindowUpdate(tmp_sctx.ReleaseDeferredWindowUpdate());
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_STREAM_CLOSED_ERROR, frame_head.stream_id);
}
return sctx->OnData(it, frame_head, frag_size, pad_length);
}
......@@ -725,8 +743,13 @@ H2ParseResult H2StreamContext::OnData(
return MakeH2Error(H2_PROTOCOL_ERROR);
}
}
const int64_t acc = _deferred_window_update.fetch_add(frag_size, butil::memory_order_relaxed) + frag_size;
if (acc >= _conn_ctx->local_settings().stream_window_size / 2) {
if (acc > _conn_ctx->local_settings().stream_window_size) {
LOG(ERROR) << "Fail to satisfy the stream-level flow control policy";
return MakeH2Error(H2_FLOW_CONTROL_ERROR, frame_head.stream_id);
}
// Rarely happen for small messages.
const int64_t stream_wu =
_deferred_window_update.exchange(0, butil::memory_order_relaxed);
......@@ -748,7 +771,7 @@ H2ParseResult H2StreamContext::OnData(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
LOG(WARNING) << "Fail to send WINDOW_UPDATE to " << *_conn_ctx->_socket;
return MakeH2Error(H2_INTERNAL_ERROR);
}
}
......@@ -820,6 +843,7 @@ H2ParseResult H2StreamContext::EndRemoteStream() {
RPC_VLOG << "Fail to find stream_id=" << stream_id();
return MakeH2Message(NULL);
}
CHECK_EQ(sctx, this);
// The remote stream will not send any more data, sending back the
// stream-level WINDOW_UPDATE is pointless, just move the value into
// the connection.
......@@ -878,7 +902,7 @@ H2ParseResult H2Context::OnSettings(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to respond settings with ack";
LOG(WARNING) << "Fail to respond settings with ack to " << *_socket;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
return MakeH2Message(NULL);
......@@ -918,7 +942,7 @@ H2ParseResult H2Context::OnPing(
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send ack of PING";
LOG(WARNING) << "Fail to send ack of PING to " << *_socket;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
return MakeH2Message(NULL);
......@@ -973,7 +997,7 @@ H2ParseResult H2Context::OnWindowUpdate(
}
if (frame_head.stream_id == 0) {
if (!AddWindowSize(&_remote_window_left, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
LOG(ERROR) << "Invalid connection-level window_size_increment=" << inc;
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
return MakeH2Message(NULL);
......@@ -984,7 +1008,8 @@ H2ParseResult H2Context::OnWindowUpdate(
return MakeH2Message(NULL);
}
if (!AddWindowSize(&sctx->_remote_window_left, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
LOG(ERROR) << "Invalid stream-level window_size_increment=" << inc
<< " to remote_window_left=" << sctx->_remote_window_left.load(butil::memory_order_relaxed);
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
return MakeH2Message(NULL);
......
......@@ -397,6 +397,8 @@ friend void InitFrameHandlers();
inline int H2Context::AllocateClientStreamId() {
if (RunOutStreams()) {
LOG(WARNING) << "Fail to allocate new client stream, _last_client_stream_id="
<< _last_client_stream_id;
return -1;
}
const int id = _last_client_stream_id;
......
......@@ -22,7 +22,7 @@ DEFINE_bool(loop, false, "run until ctrl-C is pressed");
DEFINE_bool(use_futex, false, "use futex instead of pipe");
DEFINE_bool(use_butex, false, "use butex instead of pipe");
void (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN);
void ALLOW_UNUSED (*ignore_sigpipe)(int) = signal(SIGPIPE, SIG_IGN);
volatile bool stop = false;
void quit_handler(int) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment