Commit b4f25c89 authored by zyearn's avatar zyearn Committed by gejun

Add flow control in http2

parent d6ebf1d2
...@@ -179,14 +179,19 @@ public: ...@@ -179,14 +179,19 @@ public:
HPacker& hpacker() { return _hpacker; } HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; } const H2Settings& remote_settings() const { return _remote_settings; }
const H2Settings& local_settings() const { return _local_settings; }
bool is_client_side() const { return _socket->CreatedByConnect(); } bool is_client_side() const { return _socket->CreatedByConnect(); }
bool is_server_side() const { return !is_client_side(); } bool is_server_side() const { return !is_client_side(); }
void Describe(std::ostream& os, const DescribeOptions&) const; void Describe(std::ostream& os, const DescribeOptions&) const;
void ReclaimWindowSize(int64_t);
private: private:
friend class H2StreamContext; friend class H2StreamContext;
friend class H2UnsentRequest;
friend class H2UnsentResponse;
friend void InitFrameHandlers(); friend void InitFrameHandlers();
ParseResult ConsumeFrameHead(butil::IOBufBytesIterator&, H2FrameHead*); ParseResult ConsumeFrameHead(butil::IOBufBytesIterator&, H2FrameHead*);
...@@ -226,15 +231,28 @@ friend void InitFrameHandlers(); ...@@ -226,15 +231,28 @@ friend void InitFrameHandlers();
typedef butil::FlatMap<int, H2StreamContext*> StreamMap; typedef butil::FlatMap<int, H2StreamContext*> StreamMap;
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
StreamMap _pending_streams; StreamMap _pending_streams;
butil::atomic<int64_t> _pending_conn_window_size;
}; };
inline bool add_window_size(butil::atomic<int64_t>* window_size, int64_t diff) { inline bool add_window_size(butil::atomic<int64_t>* window_size, int64_t diff) {
// A sender MUST NOT allow a flow-control window to exceed 2^31 - 1.
// If a sender receives a WINDOW_UPDATE that causes a flow-control window
// to exceed this maximum, it MUST terminate either the stream or the connection,
// as appropriate.
int64_t before_add = window_size->fetch_add(diff, butil::memory_order_relaxed); int64_t before_add = window_size->fetch_add(diff, butil::memory_order_relaxed);
if (before_add + diff > if ((((before_add | diff) >> 31) & 1) == 0) {
static_cast<int64_t>(H2Settings::MAX_INITIAL_WINDOW_SIZE)) { // two positive int64_t, check positive overflow
// being negative is OK if ((before_add + diff) & (1 << 31)) {
return false;
}
}
if ((((before_add & diff) >> 31) & 1) == 1) {
// two negative int64_t, check negaitive overflow
if (((before_add + diff) & (1 << 31)) == 0) {
return false; return false;
} }
}
// window_size being negative is OK
return true; return true;
} }
...@@ -279,7 +297,8 @@ H2Context::H2Context(Socket* socket, const Server* server) ...@@ -279,7 +297,8 @@ H2Context::H2Context(Socket* socket, const Server* server)
, _remote_conn_window_size(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE) , _remote_conn_window_size(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE)
, _conn_state(H2_CONNECTION_UNINITIALIZED) , _conn_state(H2_CONNECTION_UNINITIALIZED)
, _last_server_stream_id(-1) , _last_server_stream_id(-1)
, _last_client_stream_id(1) { , _last_client_stream_id(1)
, _pending_conn_window_size(0) {
if (server) { if (server) {
_unack_local_settings = server->options().http2_settings; _unack_local_settings = server->options().http2_settings;
} else { } else {
...@@ -309,10 +328,16 @@ H2StreamContext::H2StreamContext() ...@@ -309,10 +328,16 @@ H2StreamContext::H2StreamContext()
#endif #endif
, _stream_ended(false) , _stream_ended(false)
, _remote_window_size(0) , _remote_window_size(0)
, _local_window_size(0)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
} }
H2StreamContext::~H2StreamContext() {
int64_t diff = _conn_ctx->local_settings().initial_window_size - _local_window_size;
_conn_ctx->ReclaimWindowSize(diff);
}
int H2Context::Init() { int H2Context::Init() {
if (_pending_streams.init(64, 70) != 0) { if (_pending_streams.init(64, 70) != 0) {
LOG(ERROR) << "Fail to init _pending_streams"; LOG(ERROR) << "Fail to init _pending_streams";
...@@ -443,10 +468,14 @@ ParseResult H2Context::Consume( ...@@ -443,10 +468,14 @@ ParseResult H2Context::Consume(
} }
_conn_state = H2_CONNECTION_READY; _conn_state = H2_CONNECTION_READY;
char settingbuf[36];
_unack_local_settings.SerializeTo(settingbuf);
char headbuf[FRAME_HEAD_SIZE]; char headbuf[FRAME_HEAD_SIZE];
SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, 0, 0); SerializeFrameHead(headbuf, _unack_local_settings.ByteSize(),
H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf; butil::IOBuf buf;
buf.append(headbuf, FRAME_HEAD_SIZE); buf.append(headbuf, FRAME_HEAD_SIZE);
buf.append(settingbuf, _unack_local_settings.ByteSize());
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true; wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) { if (socket->Write(&buf, &wopt) != 0) {
...@@ -671,6 +700,30 @@ H2ParseResult H2StreamContext::OnData( ...@@ -671,6 +700,30 @@ H2ParseResult H2StreamContext::OnData(
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
} }
int64_t before_sub = _local_window_size.fetch_sub(frag_size, butil::memory_order_relaxed);
// HTTP/2 defines only the format and semantics of the WINDOW_UPDATE frame (Section 6.9).
// Spec does not stipulate how a receiver decides when to send this frame or the value
// that it sends, nor does it specify how a sender chooses to send packets.
// Implementations are able to select any algorithm that suits their needs.
if (before_sub < _conn_ctx->local_settings().initial_window_size / 3) {
int64_t old_value = _local_window_size.exchange(_conn_ctx->local_settings().initial_window_size,
butil::memory_order_relaxed);
char swinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(swinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, stream_id());
SaveUint32(swinbuf + FRAME_HEAD_SIZE, _local_window_size - old_value);
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, _local_window_size - old_value);
butil::IOBuf sendbuf;
sendbuf.append(swinbuf, sizeof(swinbuf));
sendbuf.append(cwinbuf, sizeof(cwinbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
return MakeH2Error(H2_INTERNAL_ERROR);
}
}
if (frame_head.flags & H2_FLAGS_END_STREAM) { if (frame_head.flags & H2_FLAGS_END_STREAM) {
return EndRemoteStream(); return EndRemoteStream();
} }
...@@ -772,11 +825,14 @@ H2ParseResult H2Context::OnSettings( ...@@ -772,11 +825,14 @@ H2ParseResult H2Context::OnSettings(
static_cast<int64_t>(_remote_settings.initial_window_size) static_cast<int64_t>(_remote_settings.initial_window_size)
- old_initial_window_size; - old_initial_window_size;
if (window_diff) { if (window_diff) {
add_window_size(&_remote_conn_window_size, window_diff); // Do not update the connection flow-control window here, which can only be
// changed using WINDOW_UPDATE frames.
std::unique_lock<butil::Mutex> mu(_stream_mutex); std::unique_lock<butil::Mutex> mu(_stream_mutex);
for (StreamMap::const_iterator it = _pending_streams.begin(); for (StreamMap::const_iterator it = _pending_streams.begin();
it != _pending_streams.end(); ++it) { it != _pending_streams.end(); ++it) {
add_window_size(&it->second->_remote_window_size, window_diff); if (!add_window_size(&it->second->_remote_window_size, window_diff)) {
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
} }
} }
// Respond with ack // Respond with ack
...@@ -847,19 +903,29 @@ H2ParseResult H2Context::OnWindowUpdate( ...@@ -847,19 +903,29 @@ H2ParseResult H2Context::OnWindowUpdate(
return MakeH2Error(H2_FRAME_SIZE_ERROR); return MakeH2Error(H2_FRAME_SIZE_ERROR);
} }
const uint32_t inc = LoadUint32(it); const uint32_t inc = LoadUint32(it);
if (inc & 0x80000000) { if ((inc & 0x80000000) || (inc == 0)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc; LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
LOG_EVERY_SECOND(INFO) << "Receive OnWindowUpdate, stream_id=" << frame_head.stream_id
<< ", inc=" << inc << ", _remote_conn_window_size="
<< _remote_conn_window_size;
if (frame_head.stream_id == 0) { if (frame_head.stream_id == 0) {
add_window_size(&_remote_conn_window_size, inc); if (!add_window_size(&_remote_conn_window_size, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
} else { } else {
H2StreamContext* sctx = FindStream(frame_head.stream_id); H2StreamContext* sctx = FindStream(frame_head.stream_id);
if (sctx == NULL) { if (sctx == NULL) {
LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id; LOG(ERROR) << "Fail to find stream_id=" << frame_head.stream_id;
return MakeH2Message(NULL); return MakeH2Message(NULL);
} }
add_window_size(&sctx->_remote_window_size, inc); if (!add_window_size(&sctx->_remote_window_size, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_FLOW_CONTROL_ERROR);
}
} }
return MakeH2Message(NULL); return MakeH2Message(NULL);
} }
...@@ -884,6 +950,36 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const { ...@@ -884,6 +950,36 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
} }
} }
void H2Context::ReclaimWindowSize(int64_t size) {
if (size <= 0) {
return;
}
// HTTP/2 defines only the format and semantics of the WINDOW_UPDATE frame (Section 6.9).
// Spec does not stipulate how a receiver decides when to send this frame or the value
// that it sends, nor does it specify how a sender chooses to send packets.
// Implementations are able to select any algorithm that suits their needs.
// TODO(zhujiashun): optimize the number of WINDOW_UPDATE frame
//int64_t before_add = _pending_conn_window_size.fetch_add(
// size, butil::memory_order_relaxed);
//if (before_add > local_settings().initial_window_size / 3) {
// int64_t old_value = _pending_conn_window_size.exchange(0, butil::memory_order_relaxed);
// if (old_value) {
// }
//}
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, size);
butil::IOBuf sendbuf;
sendbuf.append(cwinbuf, sizeof(cwinbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
}
}
/* /*
bvar::Adder<int64_t> g_parse_time; bvar::Adder<int64_t> g_parse_time;
bvar::PerSecond<bvar::Adder<int64_t> > g_parse_time_per_second( bvar::PerSecond<bvar::Adder<int64_t> > g_parse_time_per_second(
...@@ -951,6 +1047,8 @@ void H2StreamContext::Init(H2Context* conn_ctx, int stream_id) { ...@@ -951,6 +1047,8 @@ void H2StreamContext::Init(H2Context* conn_ctx, int stream_id) {
_conn_ctx = conn_ctx; _conn_ctx = conn_ctx;
_remote_window_size.store(conn_ctx->remote_settings().initial_window_size, _remote_window_size.store(conn_ctx->remote_settings().initial_window_size,
butil::memory_order_relaxed); butil::memory_order_relaxed);
_local_window_size.store(conn_ctx->local_settings().initial_window_size,
butil::memory_order_relaxed);
header()._h2_stream_id = stream_id; header()._h2_stream_id = stream_id;
} }
...@@ -961,6 +1059,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id) ...@@ -961,6 +1059,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
#endif #endif
, _stream_ended(false) , _stream_ended(false)
, _remote_window_size(conn_ctx->remote_settings().initial_window_size) , _remote_window_size(conn_ctx->remote_settings().initial_window_size)
, _local_window_size(conn_ctx->local_settings().initial_window_size)
, _correlation_id(INVALID_BTHREAD_ID.value) { , _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0); header().set_version(2, 0);
header()._h2_stream_id = stream_id; header()._h2_stream_id = stream_id;
...@@ -1270,17 +1369,6 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1270,17 +1369,6 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, 0, 0); SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, 0, 0);
out->append(headbuf, FRAME_HEAD_SIZE); out->append(headbuf, FRAME_HEAD_SIZE);
} }
/*
int64_t s_win = _remote_window_size.load(butil::memory_order_relaxed);
int64_t c_win = _conn_ctx->_remote_conn_window_size.load(
butil::memory_order_relaxed);
const int64_t sz = out->size();
if (sz > s_win || sz > c_win) {
return butil::Status(EAGAIN, "flow control");
}
consume_window_size(&_remote_window_size
if (out->size() < available_remote_window_size()
*/
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
if (_cntl == NULL) { if (_cntl == NULL) {
...@@ -1297,6 +1385,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1297,6 +1385,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
} }
H2StreamContext* sctx = _sctx.release(); H2StreamContext* sctx = _sctx.release();
sctx->Init(ctx, id); sctx->Init(ctx, id);
if (!ctx->TryToInsertStream(id, sctx)) { if (!ctx->TryToInsertStream(id, sctx)) {
delete sctx; delete sctx;
return butil::Status(ECANCELED, "stream_id already exists"); return butil::Status(ECANCELED, "stream_id already exists");
...@@ -1322,6 +1411,19 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1322,6 +1411,19 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBuf frag; butil::IOBuf frag;
appender.move_to(frag); appender.move_to(frag);
// flow control
int64_t s_win = sctx->_remote_window_size.load(butil::memory_order_relaxed);
int64_t c_win = ctx->_remote_conn_window_size.load(butil::memory_order_relaxed);
LOG_EVERY_SECOND(INFO) << "s_win=" << s_win << ", c_win=" << c_win;
const int64_t sz = _cntl->request_attachment().size();
if (sz > s_win || sz > c_win) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
if (!consume_window_size(&sctx->_remote_window_size, sz) ||
!consume_window_size(&ctx->_remote_conn_window_size, sz)) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
PackH2Message(out, frag, _cntl->request_attachment(), PackH2Message(out, frag, _cntl->request_attachment(),
_stream_id, ctx->remote_settings()); _stream_id, ctx->remote_settings());
return butil::Status::OK(); return butil::Status::OK();
...@@ -1451,6 +1553,13 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1451,6 +1553,13 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBuf frag; butil::IOBuf frag;
appender.move_to(frag); appender.move_to(frag);
// flow control
int64_t c_win = ctx->_remote_conn_window_size.load(butil::memory_order_relaxed);
const int64_t sz = _data.size();
if ((sz > c_win) || !consume_window_size(&ctx->_remote_conn_window_size, sz)) {
return butil::Status(EAGAIN, "Remote window size is not enough(flow control)");
}
PackH2Message(out, frag, _data, _stream_id, ctx->remote_settings()); PackH2Message(out, frag, _data, _stream_id, ctx->remote_settings());
return butil::Status::OK(); return butil::Status::OK();
} }
......
...@@ -164,6 +164,7 @@ private: ...@@ -164,6 +164,7 @@ private:
class H2StreamContext : public HttpContext { class H2StreamContext : public HttpContext {
public: public:
H2StreamContext(); H2StreamContext();
~H2StreamContext();
void Init(H2Context* conn_ctx, int stream_id); void Init(H2Context* conn_ctx, int stream_id);
H2StreamContext(H2Context* conn_ctx, int stream_id); H2StreamContext(H2Context* conn_ctx, int stream_id);
...@@ -198,6 +199,7 @@ friend class H2Context; ...@@ -198,6 +199,7 @@ friend class H2Context;
#endif #endif
bool _stream_ended; bool _stream_ended;
butil::atomic<int64_t> _remote_window_size; butil::atomic<int64_t> _remote_window_size;
butil::atomic<int64_t> _local_window_size;
uint64_t _correlation_id; uint64_t _correlation_id;
butil::IOBuf _remaining_header_fragment; butil::IOBuf _remaining_header_fragment;
}; };
......
...@@ -762,6 +762,11 @@ private: ...@@ -762,6 +762,11 @@ private:
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set; std::set<StreamId> *_stream_set;
// In some protocols, certain resources may run out according to
// protocol spec. For example, http2 streamId would run out after
// long time running and a new socket should be created. In order
// not to affect main socket, _agent_socket are introduced to
// represent the communication socket.
SocketUniquePtr _agent_socket; SocketUniquePtr _agent_socket;
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment