Commit 513ed819 authored by gejun's avatar gejun

Fix issues around WINDOW_UPDATE & rpc-timeout does not close the connection

parent 9cf3f779
......@@ -137,7 +137,7 @@ size_t H2Settings::ByteSize() const {
return size;
}
void H2Settings::SerializeTo(void* out) const {
size_t H2Settings::SerializeTo(void* out) const {
uint8_t* p = (uint8_t*)out;
if (header_table_size != DEFAULT_HEADER_TABLE_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_HEADER_TABLE_SIZE);
......@@ -169,6 +169,7 @@ void H2Settings::SerializeTo(void* out) const {
SaveUint32(p + 2, max_header_list_size);
p += 6;
}
return static_cast<size_t>(p - (uint8_t*)out);
}
void H2Settings::Print(std::ostream& os) const {
......
......@@ -90,10 +90,16 @@ struct H2Settings {
// Parse from n bytes from the iterator.
// Returns true on success.
bool ParseFrom(butil::IOBufBytesIterator&, size_t n);
// Bytes of serialized data.
size_t ByteSize() const;
// Maximum value that may be returned by ByteSize().
static const size_t MAX_BYTE_SIZE = 36;
// Serialize to `out' which is at least ByteSize() bytes long.
void SerializeTo(void* out) const;
// Returns bytes written.
size_t SerializeTo(void* out) const;
void Print(std::ostream&) const;
};
......
......@@ -44,9 +44,6 @@ DEFINE_bool(http2_hpack_encode_name, false,
DEFINE_bool(http2_hpack_encode_value, false,
"Encode value in HTTP2 headers with huffman encoding");
DEFINE_int32(http2_window_update_size, 2048,
"Initial window update size for flow control");
const char* H2StreamState2Str(H2StreamState s) {
switch (s) {
case H2_STREAM_IDLE: return "idle";
......@@ -174,14 +171,14 @@ public:
void AddAbandonedStream(uint32_t stream_id);
//@Destroyable
void Destroy() { delete this; }
void Destroy() override { delete this; }
int AllocateClientStreamId();
bool RunOutStreams() const;
// Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
uint32_t StreamSize();
uint32_t VolatilePendingStreamSize() const;
HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; }
......@@ -192,7 +189,8 @@ public:
void Describe(std::ostream& os, const DescribeOptions&) const;
void ReclaimWindowSize(int64_t);
void DeferWindowUpdate(int64_t);
int64_t ReleaseDeferredWindowUpdate();
private:
friend class H2StreamContext;
......@@ -214,17 +212,13 @@ friend void InitFrameHandlers();
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const H2FrameHead&);
H2StreamContext* RemoveStream(int stream_id);
H2StreamContext* FindStream(int stream_id, bool* closed);
H2StreamContext* FindStream(int stream_id) {
return FindStream(stream_id, NULL);
}
H2StreamContext* FindStream(int stream_id);
void ClearAbandonedStreamsImpl();
// True if the connection is established by client, otherwise it's
// accepted by server.
Socket* _socket;
butil::atomic<int64_t> _remote_conn_window_size;
butil::atomic<int64_t> _remote_window_left;
H2ConnectionState _conn_state;
int _last_server_stream_id;
uint32_t _last_client_stream_id;
......@@ -232,15 +226,15 @@ friend void InitFrameHandlers();
H2Settings _local_settings;
H2Settings _unack_local_settings;
HPacker _hpacker;
butil::Mutex _abandoned_streams_mutex;
mutable butil::Mutex _abandoned_streams_mutex;
std::vector<uint32_t> _abandoned_streams;
typedef butil::FlatMap<int, H2StreamContext*> StreamMap;
butil::Mutex _stream_mutex;
mutable butil::Mutex _stream_mutex;
StreamMap _pending_streams;
butil::atomic<int64_t> _pending_conn_window_size;
butil::atomic<int64_t> _deferred_window_update;
};
inline bool add_window_size(butil::atomic<int64_t>* window_size, int64_t diff) {
inline bool AddWindowSize(butil::atomic<int64_t>* window_size, int64_t diff) {
// A sender MUST NOT allow a flow-control window to exceed 2^31 - 1.
// If a sender receives a WINDOW_UPDATE that causes a flow-control window
// to exceed this maximum, it MUST terminate either the stream or the connection,
......@@ -262,7 +256,7 @@ inline bool add_window_size(butil::atomic<int64_t>* window_size, int64_t diff) {
return true;
}
inline bool consume_window_size(butil::atomic<int64_t>* window_size, int64_t size) {
inline bool MinusWindowSize(butil::atomic<int64_t>* window_size, int64_t size) {
if (window_size->load(butil::memory_order_relaxed) < size) {
// false negative is OK.
return false;
......@@ -300,11 +294,11 @@ inline H2Context::FrameHandler FindFrameHandler(H2FrameType type) {
H2Context::H2Context(Socket* socket, const Server* server)
: _socket(socket)
, _remote_conn_window_size(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE)
, _remote_window_left(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE)
, _conn_state(H2_CONNECTION_UNINITIALIZED)
, _last_server_stream_id(-1)
, _last_client_stream_id(1)
, _pending_conn_window_size(0) {
, _deferred_window_update(0) {
if (server) {
_unack_local_settings = server->options().http2_settings;
} else {
......@@ -329,12 +323,12 @@ H2Context::~H2Context() {
H2StreamContext::H2StreamContext()
: _conn_ctx(NULL)
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
, _state(H2_STREAM_IDLE)
#endif
, _stream_ended(false)
, _remote_window_size(0)
, _local_window_size(0)
, _remote_window_left(0)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
#ifndef NDEBUG
......@@ -343,15 +337,33 @@ H2StreamContext::H2StreamContext()
}
H2StreamContext::~H2StreamContext() {
if (_conn_ctx) {
int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size;
_conn_ctx->ReclaimWindowSize(diff);
}
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << -1;
#endif
}
bool H2StreamContext::ConsumeWindowSize(int64_t size) {
// This method is guaranteed to be called in AppendAndDestroySelf() which
// is run sequentially. As a result, _remote_window_left of this stream
// context will not be decremented (may be incremented) because following
// AppendAndDestroySelf() are not run yet.
// This fact is important to make window_size changes to stream and
// connection contexts transactionally.
if (_remote_window_left.load(butil::memory_order_relaxed) < size) {
return false;
}
if (!MinusWindowSize(&_conn_ctx->_remote_window_left, size)) {
return false;
}
int64_t after_sub = _remote_window_left.fetch_sub(size, butil::memory_order_relaxed) - size;
if (after_sub < 0) {
LOG(FATAL) << "Impossible, the http2 impl is buggy";
_remote_window_left.fetch_add(size, butil::memory_order_relaxed);
return false;
}
return true;
}
int H2Context::Init() {
if (_pending_streams.init(64, 70) != 0) {
LOG(ERROR) << "Fail to init _pending_streams";
......@@ -386,7 +398,7 @@ H2StreamContext* H2Context::RemoveStream(int stream_id) {
return NULL;
}
H2StreamContext* H2Context::FindStream(int stream_id, bool* closed) {
H2StreamContext* H2Context::FindStream(int stream_id) {
{
std::unique_lock<butil::Mutex> mu(_stream_mutex);
H2StreamContext** psctx = _pending_streams.seek(stream_id);
......@@ -394,11 +406,13 @@ H2StreamContext* H2Context::FindStream(int stream_id, bool* closed) {
return *psctx;
}
}
/*
if (closed) {
const uint32_t limit = is_client_side() ? _last_client_stream_id
: (uint32_t)_last_server_stream_id;
*closed = ((uint32_t)stream_id < limit);
}
*/
return NULL;
}
......@@ -412,8 +426,7 @@ bool H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) {
return false;
}
uint32_t H2Context::StreamSize() {
std::unique_lock<butil::Mutex> mu(_stream_mutex);
inline uint32_t H2Context::VolatilePendingStreamSize() const {
return _pending_streams.size();
}
......@@ -483,14 +496,11 @@ ParseResult H2Context::Consume(
}
_conn_state = H2_CONNECTION_READY;
char settingbuf[36];
_unack_local_settings.SerializeTo(settingbuf);
char headbuf[FRAME_HEAD_SIZE];
SerializeFrameHead(headbuf, _unack_local_settings.ByteSize(),
H2_FRAME_SETTINGS, 0, 0);
char settingsbuf[FRAME_HEAD_SIZE + H2Settings::MAX_BYTE_SIZE];
const size_t nb = _unack_local_settings.SerializeTo(settingsbuf + FRAME_HEAD_SIZE);
SerializeFrameHead(settingsbuf, nb, H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(headbuf, FRAME_HEAD_SIZE);
buf.append(settingbuf, _unack_local_settings.ByteSize());
buf.append(settingsbuf, FRAME_HEAD_SIZE + nb);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) {
......@@ -604,8 +614,13 @@ H2ParseResult H2Context::OnHeaders(
} else {
sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
LOG(ERROR) << "stream_id=" << frame_head.stream_id
<< " does not exist";
if (is_client_side()) {
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(this, frame_head.stream_id);
tmp_sctx.OnHeaders(it, frame_head, frag_size, pad_length);
return MakeH2Message(NULL);
}
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
}
......@@ -616,7 +631,7 @@ H2ParseResult H2StreamContext::OnHeaders(
butil::IOBufBytesIterator& it, const H2FrameHead& frame_head,
uint32_t frag_size, uint8_t pad_length) {
_parsed_length += FRAME_HEAD_SIZE + frame_head.payload_size;
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
SetState(H2_STREAM_OPEN);
#endif
butil::IOBufBytesIterator it2(it, frag_size);
......@@ -640,18 +655,25 @@ H2ParseResult H2StreamContext::OnHeaders(
return EndRemoteStream();
}
return MakeH2Message(NULL);
}
} else {
if (frame_head.flags & H2_FLAGS_END_STREAM) {
// Delay calling EndRemoteStream() in OnContinuation()
_stream_ended = true;
}
return MakeH2Message(NULL);
}
}
H2ParseResult H2Context::OnContinuation(
butil::IOBufBytesIterator& it, const H2FrameHead& frame_head) {
H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
if (is_client_side()) {
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(this, frame_head.stream_id);
tmp_sctx.OnContinuation(it, frame_head);
return MakeH2Message(NULL);
}
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
......@@ -697,6 +719,13 @@ H2ParseResult H2Context::OnData(
frag_size -= pad_length;
H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
if (is_client_side()) {
// Ignore the message without closing the socket.
H2StreamContext tmp_sctx(this, frame_head.stream_id);
tmp_sctx.OnData(it, frame_head, frag_size, pad_length);
DeferWindowUpdate(tmp_sctx.ReleaseDeferredWindowUpdate());
return MakeH2Message(NULL);
}
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Message(NULL);
}
......@@ -717,23 +746,23 @@ H2ParseResult H2StreamContext::OnData(
return MakeH2Error(H2_PROTOCOL_ERROR);
}
}
int64_t before_sub = _local_window_size.fetch_sub(frag_size, butil::memory_order_relaxed);
// HTTP/2 defines only the format and semantics of the WINDOW_UPDATE frame (Section 6.9).
// Spec does not stipulate how a receiver decides when to send this frame or the value
// that it sends, nor does it specify how a sender chooses to send packets.
// Implementations are able to select any algorithm that suits their needs.
if (before_sub < _conn_ctx->local_settings().initial_window_size / 3) {
int64_t old_value = _local_window_size.exchange(_conn_ctx->local_settings().initial_window_size,
butil::memory_order_relaxed);
char swinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(swinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, stream_id());
SaveUint32(swinbuf + FRAME_HEAD_SIZE, _local_window_size - old_value);
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, _local_window_size - old_value);
const int64_t acc = _deferred_window_update.fetch_add(frag_size, butil::memory_order_relaxed) + frag_size;
if (acc >= _conn_ctx->local_settings().initial_window_size / 2) {
// Rarely happen for small messages.
const int64_t stream_wu =
_deferred_window_update.exchange(0, butil::memory_order_relaxed);
const int64_t conn_wu = stream_wu + _conn_ctx->ReleaseDeferredWindowUpdate();
char winbuf[(FRAME_HEAD_SIZE + 4) * 2];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, stream_id());
SaveUint32(winbuf + FRAME_HEAD_SIZE, stream_wu);
char* cwin = winbuf + FRAME_HEAD_SIZE + 4;
SerializeFrameHead(cwin, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwin + FRAME_HEAD_SIZE, conn_wu);
butil::IOBuf sendbuf;
sendbuf.append(swinbuf, sizeof(swinbuf));
sendbuf.append(cwinbuf, sizeof(cwinbuf));
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
......@@ -765,7 +794,7 @@ H2ParseResult H2Context::OnResetStream(
H2ParseResult H2StreamContext::OnResetStream(
H2Error h2_error, const H2FrameHead& frame_head) {
_parsed_length += FRAME_HEAD_SIZE + frame_head.payload_size;
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
if (state() == H2_STREAM_OPEN) {
SetState(H2_STREAM_HALF_CLOSED_REMOTE);
} else if (state() == H2_STREAM_HALF_CLOSED_LOCAL) {
......@@ -793,7 +822,7 @@ H2ParseResult H2StreamContext::OnResetStream(
}
H2ParseResult H2StreamContext::EndRemoteStream() {
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
if (state() == H2_STREAM_OPEN) {
SetState(H2_STREAM_HALF_CLOSED_REMOTE);
} else if (state() == H2_STREAM_HALF_CLOSED_LOCAL) {
......@@ -804,12 +833,16 @@ H2ParseResult H2StreamContext::EndRemoteStream() {
return MakeH2Error(H2_PROTOCOL_ERROR);
}
#endif
OnMessageComplete();
H2StreamContext* sctx = _conn_ctx->RemoveStream(stream_id());
if (sctx == NULL) {
LOG(ERROR) << "Fail to find stream_id=" << stream_id();
return MakeH2Error(H2_PROTOCOL_ERROR);
return MakeH2Message(NULL);
}
// The remote stream will not send any more data, sending back the
// stream-level WINDOW_UPDATE is pointless, just move the value into
// the connection.
_conn_ctx->DeferWindowUpdate(sctx->ReleaseDeferredWindowUpdate());
OnMessageComplete();
return MakeH2Message(sctx);
}
......@@ -842,12 +875,14 @@ H2ParseResult H2Context::OnSettings(
static_cast<int64_t>(_remote_settings.initial_window_size)
- old_initial_window_size;
if (window_diff) {
// Do not update the connection flow-control window here, which can only be
// changed using WINDOW_UPDATE frames.
// Do not update the connection flow-control window here, which can only
// be changed using WINDOW_UPDATE frames.
// https://tools.ietf.org/html/rfc7540#section-6.9.2
// TODO(gejun): Has race conditions with AppendAndDestroySelf
std::unique_lock<butil::Mutex> mu(_stream_mutex);
for (StreamMap::const_iterator it = _pending_streams.begin();
it != _pending_streams.end(); ++it) {
if (!add_window_size(&it->second->_remote_window_size, window_diff)) {
if (!AddWindowSize(&it->second->_remote_window_left, window_diff)) {
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
}
......@@ -926,17 +961,17 @@ H2ParseResult H2Context::OnWindowUpdate(
return MakeH2Error(H2_PROTOCOL_ERROR);
}
if (frame_head.stream_id == 0) {
if (!add_window_size(&_remote_conn_window_size, inc)) {
if (!AddWindowSize(&_remote_window_left, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
} else {
H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) {
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
LOG(WARNING) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Message(NULL);
}
if (!add_window_size(&sctx->_remote_window_size, inc)) {
if (!AddWindowSize(&sctx->_remote_window_left, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
......@@ -949,7 +984,8 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
os << '\n';
}
const char sep = (opt.verbose ? '\n' : ' ');
os << "remote_conn_window_size=" << _remote_conn_window_size
os << "remote_window_left=" << _remote_window_left.load(butil::memory_order_relaxed)
<< sep << "deferred_window_update=" << _deferred_window_update.load(butil::memory_order_relaxed)
<< sep << "conn_state=" << H2ConnectionState2Str(_conn_state);
if (is_server_side()) {
os << sep << "last_server_stream_id=" << _last_server_stream_id;
......@@ -962,30 +998,40 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
IndentingOStream os2(os, 2);
_hpacker.Describe(os2, opt);
os << '}';
size_t abandoned_size = 0;
{
BAIDU_SCOPED_LOCK(_abandoned_streams_mutex);
abandoned_size = _abandoned_streams.size();
}
os << sep << "abandoned_streams=" << abandoned_size
<< sep << "pending_streams=" << VolatilePendingStreamSize();
if (opt.verbose) {
os << '\n';
}
}
void H2Context::ReclaimWindowSize(int64_t size) {
inline int64_t H2Context::ReleaseDeferredWindowUpdate() {
if (_deferred_window_update.load(butil::memory_order_relaxed) == 0) {
return 0;
}
return _deferred_window_update.exchange(0, butil::memory_order_relaxed);
}
void H2Context::DeferWindowUpdate(int64_t size) {
if (size <= 0) {
return;
}
// HTTP/2 defines only the format and semantics of the WINDOW_UPDATE frame (Section 6.9).
// Spec does not stipulate how a receiver decides when to send this frame or the value
// that it sends, nor does it specify how a sender chooses to send packets.
// Implementations are able to select any algorithm that suits their needs.
int64_t window_update_size = 0;
if (_pending_conn_window_size.fetch_add(size) > FLAGS_http2_window_update_size) {
window_update_size =
_pending_conn_window_size.exchange(0, butil::memory_order_relaxed);
}
if (window_update_size > 0) {
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, window_update_size);
const int64_t acc = _deferred_window_update.fetch_add(size, butil::memory_order_relaxed) + size;
if (acc >= local_settings().initial_window_size / 2) {
// Rarely happen for small messages.
const int64_t conn_wu = _deferred_window_update.exchange(0, butil::memory_order_relaxed);
char winbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu);
butil::IOBuf sendbuf;
sendbuf.append(cwinbuf, sizeof(cwinbuf));
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
......@@ -994,15 +1040,17 @@ void H2Context::ReclaimWindowSize(int64_t size) {
}
}
/*
#if defined(BRPC_PROFILE_H2)
bvar::Adder<int64_t> g_parse_time;
bvar::PerSecond<bvar::Adder<int64_t> > g_parse_time_per_second(
"h2_parse_second", &g_parse_time);
*/
#endif
ParseResult ParseH2Message(butil::IOBuf *source, Socket *socket,
bool read_eof, const void *arg) {
//bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_parse_time);
#if defined(BRPC_PROFILE_H2)
bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_parse_time);
#endif
H2Context* ctx = static_cast<H2Context*>(socket->parsing_context());
if (ctx == NULL) {
if (read_eof || source->empty()) {
......@@ -1060,21 +1108,19 @@ void H2Context::ClearAbandonedStreamsImpl() {
void H2StreamContext::Init(H2Context* conn_ctx, int stream_id) {
_conn_ctx = conn_ctx;
_remote_window_size.store(conn_ctx->remote_settings().initial_window_size,
butil::memory_order_relaxed);
_local_window_size.store(conn_ctx->local_settings().initial_window_size,
_remote_window_left.store(conn_ctx->remote_settings().initial_window_size,
butil::memory_order_relaxed);
header()._h2_stream_id = stream_id;
}
H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
: _conn_ctx(conn_ctx)
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
, _state(H2_STREAM_IDLE)
#endif
, _stream_ended(false)
, _remote_window_size(conn_ctx->remote_settings().initial_window_size)
, _local_window_size(conn_ctx->local_settings().initial_window_size)
, _remote_window_left(conn_ctx->remote_settings().initial_window_size)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
header()._h2_stream_id = stream_id;
......@@ -1083,7 +1129,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
#endif
}
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
void H2StreamContext::SetState(H2StreamState state) {
const H2StreamState old_state = _state;
_state = state;
......@@ -1187,7 +1233,8 @@ static void PackH2Message(butil::IOBuf* out,
butil::IOBuf& headers,
const butil::IOBuf& data,
int stream_id,
const H2Settings& remote_settings) {
H2Context* conn_ctx) {
const H2Settings& remote_settings = conn_ctx->remote_settings();
char headbuf[FRAME_HEAD_SIZE];
H2FrameHead headers_head = {
(uint32_t)headers.size(), H2_FRAME_HEADERS, 0, stream_id};
......@@ -1233,6 +1280,13 @@ static void PackH2Message(butil::IOBuf* out,
it.append_and_forward(out, data_head.payload_size);
}
}
const int64_t conn_wu = conn_ctx->ReleaseDeferredWindowUpdate();
if (conn_wu > 0) {
char winbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu);
out->append(winbuf, sizeof(winbuf));
}
}
H2UnsentRequest* H2UnsentRequest::New(Controller* c) {
......@@ -1349,13 +1403,17 @@ void H2UnsentRequest::DestroyStreamUserData(SocketUniquePtr& sending_sock,
}
}
// bvar::Adder<int64_t> g_append_request_time;
// bvar::PerSecond<bvar::Adder<int64_t> > g_append_request_time_per_second(
// "h2_append_request_second", &g_append_request_time);
#if defined(BRPC_PROFILE_H2)
bvar::Adder<int64_t> g_append_request_time;
bvar::PerSecond<bvar::Adder<int64_t> > g_append_request_time_per_second(
"h2_append_request_second", &g_append_request_time);
#endif
butil::Status
H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
//bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_append_request_time);
#if defined(BRPC_PROFILE_H2)
bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_append_request_time);
#endif
RemoveRefOnQuit deref_self(this);
if (socket == NULL) {
return butil::Status::OK();
......@@ -1375,9 +1433,18 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
// Append client connection preface
out->append(H2_CONNECTION_PREFACE_PREFIX,
H2_CONNECTION_PREFACE_PREFIX_SIZE);
char headbuf[FRAME_HEAD_SIZE];
SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, 0, 0);
out->append(headbuf, FRAME_HEAD_SIZE);
char settingsbuf[FRAME_HEAD_SIZE + H2Settings::MAX_BYTE_SIZE];
const size_t nb = ctx->_unack_local_settings.SerializeTo(
settingsbuf + FRAME_HEAD_SIZE);
SerializeFrameHead(settingsbuf, nb, H2_FRAME_SETTINGS, 0, 0);
out->append(settingsbuf, FRAME_HEAD_SIZE + nb);
}
// FIXME(gejun): Replace EAGAIN
// TODO(zhujiashun): also check this in server push
if (ctx->VolatilePendingStreamSize() > ctx->remote_settings().max_concurrent_streams) {
return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream");
}
// Although the critical section looks huge, it should rarely be contended
......@@ -1387,11 +1454,6 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
return butil::Status(ECANCELED, "The RPC was already failed");
}
// TODO(zhujiashun): also check this in server push
if (ctx->StreamSize() > ctx->remote_settings().max_concurrent_streams) {
return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream");
}
const int id = ctx->AllocateClientStreamId();
if (id < 0) {
// The RPC should be failed and retried.
......@@ -1409,6 +1471,14 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}
_stream_id = sctx->stream_id();
// flow control
if (!_cntl->request_attachment().empty()) {
const int64_t data_size = _cntl->request_attachment().size();
if (!sctx->ConsumeWindowSize(data_size)) {
return butil::Status(EAGAIN, "remote_window_left is not enough, data_size=%" PRId64, data_size);
}
}
HPacker& hpacker = ctx->hpacker();
butil::IOBufAppender appender;
HPackOptions options;
......@@ -1427,21 +1497,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}
butil::IOBuf frag;
appender.move_to(frag);
// flow control
int64_t s_win = sctx->_remote_window_size.load(butil::memory_order_relaxed);
int64_t c_win = ctx->_remote_conn_window_size.load(butil::memory_order_relaxed);
const int64_t sz = _cntl->request_attachment().size();
if (sz > s_win || sz > c_win) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
if (!consume_window_size(&sctx->_remote_window_size, sz) ||
!consume_window_size(&ctx->_remote_conn_window_size, sz)) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
PackH2Message(out, frag, _cntl->request_attachment(),
_stream_id, ctx->remote_settings());
PackH2Message(out, frag, _cntl->request_attachment(), _stream_id, ctx);
return butil::Status::OK();
}
......@@ -1537,19 +1593,31 @@ void H2UnsentResponse::Destroy() {
free(this);
}
// bvar::Adder<int64_t> g_append_response_time;
// bvar::PerSecond<bvar::Adder<int64_t> > g_append_response_time_per_second(
// "h2_append_response_second", &g_append_response_time);
#if defined(BRPC_PROFILE_H2)
bvar::Adder<int64_t> g_append_response_time;
bvar::PerSecond<bvar::Adder<int64_t> > g_append_response_time_per_second(
"h2_append_response_second", &g_append_response_time);
#endif
butil::Status
H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
//bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_append_response_time);
#if defined(BRPC_PROFILE_H2)
bvar::ScopedTimer<bvar::Adder<int64_t> > tm(g_append_response_time);
#endif
DestroyingPtr<H2UnsentResponse> destroy_self(this);
if (socket == NULL) {
return butil::Status::OK();
}
H2Context* ctx = static_cast<H2Context*>(socket->parsing_context());
// flow control
// NOTE: Currently the stream context is definitely removed and updating
// window size is useless, however it's not true when progressive request
// is supported.
if (!MinusWindowSize(&ctx->_remote_window_left, _data.size())) {
return butil::Status(EAGAIN, "Remote window size is not enough");
}
HPacker& hpacker = ctx->hpacker();
butil::IOBufAppender appender;
HPackOptions options;
......@@ -1569,14 +1637,7 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBuf frag;
appender.move_to(frag);
// flow control
int64_t c_win = ctx->_remote_conn_window_size.load(butil::memory_order_relaxed);
const int64_t sz = _data.size();
if ((sz > c_win) || !consume_window_size(&ctx->_remote_conn_window_size, sz)) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
PackH2Message(out, frag, _data, _stream_id, ctx->remote_settings());
PackH2Message(out, frag, _data, _stream_id, ctx);
return butil::Status::OK();
}
......
......@@ -24,7 +24,10 @@
#include "brpc/details/hpack.h"
#include "brpc/stream_creator.h"
#include "brpc/controller.h"
#ifndef NDEBUG
#include "bvar/bvar.h"
#endif
namespace brpc {
......@@ -159,8 +162,8 @@ public:
void Destroy();
void Describe(butil::IOBuf*) const;
// @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
size_t EstimatedByteSize();
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*) override;
size_t EstimatedByteSize() override;
private:
std::string& push(const std::string& name)
......@@ -214,19 +217,28 @@ public:
size_t parsed_length() const { return this->_parsed_length; }
int stream_id() const { return header().h2_stream_id(); }
#ifdef HAS_H2_STREAM_STATE
int64_t ReleaseDeferredWindowUpdate() {
if (_deferred_window_update.load(butil::memory_order_relaxed) == 0) {
return 0;
}
return _deferred_window_update.exchange(0, butil::memory_order_relaxed);
}
bool ConsumeWindowSize(int64_t size);
#if defined(BRPC_H2_STREAM_STATE)
H2StreamState state() const { return _state; }
void SetState(H2StreamState state);
#endif
friend class H2Context;
H2Context* _conn_ctx;
#ifdef HAS_H2_STREAM_STATE
#if defined(BRPC_H2_STREAM_STATE)
H2StreamState _state;
#endif
bool _stream_ended;
butil::atomic<int64_t> _remote_window_size;
butil::atomic<int64_t> _local_window_size;
butil::atomic<int64_t> _remote_window_left;
butil::atomic<int64_t> _deferred_window_update;
uint64_t _correlation_id;
butil::IOBuf _remaining_header_fragment;
};
......@@ -250,7 +262,6 @@ protected:
};
} // namespace policy
} // namespace brpc
#endif // BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment