Commit 61b594af authored by gejun's avatar gejun

Separate window_size of connections from streams and fix WU bugs

parent 8994b65a
......@@ -23,17 +23,39 @@ H2Settings::H2Settings()
: header_table_size(DEFAULT_HEADER_TABLE_SIZE)
, enable_push(false)
, max_concurrent_streams(std::numeric_limits<uint32_t>::max())
, initial_window_size(DEFAULT_INITIAL_WINDOW_SIZE)
, stream_window_size(256 * 1024)
, connection_window_size(1024 * 1024)
, max_frame_size(DEFAULT_MAX_FRAME_SIZE)
, max_header_list_size(std::numeric_limits<uint32_t>::max()) {
}
bool H2Settings::IsValid(bool log_error) const {
if (stream_window_size > MAX_WINDOW_SIZE) {
LOG_IF(ERROR, log_error) << "Invalid stream_window_size=" << stream_window_size;
return false;
}
if (connection_window_size < DEFAULT_INITIAL_WINDOW_SIZE ||
connection_window_size > MAX_WINDOW_SIZE) {
LOG_IF(ERROR, log_error) << "Invalid connection_window_size=" << connection_window_size;
return false;
}
if (max_frame_size < DEFAULT_MAX_FRAME_SIZE ||
max_frame_size > MAX_OF_MAX_FRAME_SIZE) {
LOG_IF(ERROR, log_error) << "Invalid max_frame_size=" << max_frame_size;
return false;
}
return true;
}
std::ostream& operator<<(std::ostream& os, const H2Settings& s) {
os << "{header_table_size=" << s.header_table_size
<< " enable_push=" << s.enable_push
<< " max_concurrent_streams=" << s.max_concurrent_streams
<< " initial_window_size=" << s.initial_window_size
<< " max_frame_size=" << s.max_frame_size
<< " stream_window_size=" << s.stream_window_size;
if (s.connection_window_size > 0) {
os << " conn_window_size=" << s.connection_window_size;
}
os << " max_frame_size=" << s.max_frame_size
<< " max_header_list_size=" << s.max_header_list_size
<< '}';
return os;
......
......@@ -26,6 +26,9 @@ struct H2Settings {
// Construct with default values.
H2Settings();
// Returns true iff all options are valid.
bool IsValid(bool log_error = false) const;
// Allows the sender to inform the remote endpoint of the maximum size of
// the header compression table used to decode header blocks, in octets.
// The encoder can select any size equal to or less than this value by
......@@ -41,7 +44,7 @@ struct H2Settings {
// parameter to 0 and had it acknowledged MUST treat the receipt of a
// PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR.
// Default: true (server push is permitted)
// Default: false (server push is disabled)
static const bool DEFAULT_ENABLE_PUSH = true;
bool enable_push;
......@@ -60,10 +63,15 @@ struct H2Settings {
// This setting affects the window size of all streams (see Section 6.9.2).
// Values above the maximum flow-control window size of 2^31-1 are treated
// as a connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR
// Default: 65535
// Default: 256 * 1024
static const uint32_t DEFAULT_INITIAL_WINDOW_SIZE = 65535;
static const uint32_t MAX_INITIAL_WINDOW_SIZE = (1u << 31) - 1;
uint32_t initial_window_size;
static const uint32_t MAX_WINDOW_SIZE = (1u << 31) - 1;
uint32_t stream_window_size;
// Initial window size for connection-level flow control.
// Default: 1024 * 1024
// Setting to zero stops printing this field.
uint32_t connection_window_size;
// Size of the largest frame payload that the sender is willing to receive,
// in octets. The value advertised by an endpoint MUST be between 16384 and
......
......@@ -32,9 +32,10 @@ namespace policy {
DEFINE_int32(http2_client_header_table_size,
H2Settings::DEFAULT_HEADER_TABLE_SIZE,
"maximum size of compression tables for decoding headers");
DEFINE_int32(http2_client_initial_window_size,
H2Settings::DEFAULT_INITIAL_WINDOW_SIZE,
"Initial window size for flow control");
DEFINE_int32(http2_client_stream_window_size, 256 * 1024,
"Initial window size for stream-level flow control");
DEFINE_int32(http2_client_connection_window_size, 1024 * 1024,
"Initial window size for connection-level flow control");
DEFINE_int32(http2_client_max_frame_size,
H2Settings::DEFAULT_MAX_FRAME_SIZE,
"Size of the largest frame payload that client is willing to receive");
......@@ -44,6 +45,16 @@ DEFINE_bool(http2_hpack_encode_name, false,
DEFINE_bool(http2_hpack_encode_value, false,
"Encode value in HTTP2 headers with huffman encoding");
static bool CheckStreamWindowSize(const char*, int32_t val) {
return val >= 0;
}
BRPC_VALIDATE_GFLAG(http2_client_stream_window_size, CheckStreamWindowSize);
static bool CheckConnWindowSize(const char*, int32_t val) {
return val >= (int32_t)H2Settings::DEFAULT_INITIAL_WINDOW_SIZE;
}
BRPC_VALIDATE_GFLAG(http2_client_connection_window_size, CheckConnWindowSize);
const char* H2StreamState2Str(H2StreamState s) {
switch (s) {
case H2_STREAM_IDLE: return "idle";
......@@ -111,6 +122,8 @@ const uint8_t H2_FLAGS_PRIORITY = 0x20;
#define H2_CONNECTION_PREFACE_PREFIX "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
const size_t H2_CONNECTION_PREFACE_PREFIX_SIZE = 24;
const size_t FRAME_HEAD_SIZE = 9;
// https://tools.ietf.org/html/rfc7540#section-4.1
struct H2FrameHead {
// The length of the frame payload expressed as an unsigned 24-bit integer.
......@@ -134,6 +147,26 @@ struct H2FrameHead {
int stream_id;
};
static void SerializeFrameHead(void* out_buf, uint32_t payload_size,
H2FrameType type, uint8_t flags,
uint32_t stream_id) {
uint8_t* p = (uint8_t*)out_buf;
*p++ = (payload_size >> 16) & 0xFF;
*p++ = (payload_size >> 8) & 0xFF;
*p++ = payload_size & 0xFF;
*p++ = (uint8_t)type;
*p++ = flags;
*p++ = (stream_id >> 24) & 0xFF;
*p++ = (stream_id >> 16) & 0xFF;
*p++ = (stream_id >> 8) & 0xFF;
*p++ = stream_id & 0xFF;
}
inline void SerializeFrameHead(void* out_buf, const H2FrameHead& h) {
return SerializeFrameHead(out_buf, h.payload_size, h.type,
h.flags, h.stream_id);
}
static void InitFrameHandlers();
// [ https://tools.ietf.org/html/rfc7540#section-6.5.1 ]
......@@ -142,7 +175,7 @@ enum H2SettingsIdentifier {
HTTP2_SETTINGS_HEADER_TABLE_SIZE = 0x1,
HTTP2_SETTINGS_ENABLE_PUSH = 0x2,
HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = 0x3,
HTTP2_SETTINGS_INITIAL_WINDOW_SIZE = 0x4,
HTTP2_SETTINGS_STREAM_WINDOW_SIZE = 0x4,
HTTP2_SETTINGS_MAX_FRAME_SIZE = 0x5,
HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE = 0x6
};
......@@ -172,12 +205,12 @@ bool ParseH2Settings(H2Settings* out, butil::IOBufBytesIterator& it, size_t n) {
case HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS:
out->max_concurrent_streams = value;
break;
case HTTP2_SETTINGS_INITIAL_WINDOW_SIZE:
if (value > H2Settings::MAX_INITIAL_WINDOW_SIZE) {
LOG(ERROR) << "Invalid initial_window_size=" << value;
case HTTP2_SETTINGS_STREAM_WINDOW_SIZE:
if (value > H2Settings::MAX_WINDOW_SIZE) {
LOG(ERROR) << "Invalid stream_window_size=" << value;
return false;
}
out->initial_window_size = value;
out->stream_window_size = value;
break;
case HTTP2_SETTINGS_MAX_FRAME_SIZE:
if (value > H2Settings::MAX_OF_MAX_FRAME_SIZE ||
......@@ -222,9 +255,9 @@ size_t SerializeH2Settings(const H2Settings& in, void* out) {
SaveUint32(p + 2, in.max_concurrent_streams);
p += 6;
}
if (in.initial_window_size != H2Settings::DEFAULT_INITIAL_WINDOW_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
SaveUint32(p + 2, in.initial_window_size);
if (in.stream_window_size != H2Settings::DEFAULT_INITIAL_WINDOW_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_STREAM_WINDOW_SIZE);
SaveUint32(p + 2, in.stream_window_size);
p += 6;
}
if (in.max_frame_size != H2Settings::DEFAULT_MAX_FRAME_SIZE) {
......@@ -240,6 +273,20 @@ size_t SerializeH2Settings(const H2Settings& in, void* out) {
return static_cast<size_t>(p - (uint8_t*)out);
}
static size_t SerializeH2SettingsFrameAndWU(const H2Settings& in, void* out) {
uint8_t* p = (uint8_t*)out;
size_t nb = SerializeH2Settings(in, p + FRAME_HEAD_SIZE);
SerializeFrameHead(p, nb, H2_FRAME_SETTINGS, 0, 0);
p += FRAME_HEAD_SIZE + nb;
if (in.connection_window_size > H2Settings::DEFAULT_INITIAL_WINDOW_SIZE) {
SerializeFrameHead(p, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(p + FRAME_HEAD_SIZE,
in.connection_window_size - H2Settings::DEFAULT_INITIAL_WINDOW_SIZE);
p += FRAME_HEAD_SIZE + 4;
}
return static_cast<size_t>(p - (uint8_t*)out);
}
// Contexts of a http2 connection
class H2Context : public Destroyable, public Describable {
public:
......@@ -388,12 +435,15 @@ H2Context::H2Context(Socket* socket, const Server* server)
, _last_server_stream_id(-1)
, _last_client_stream_id(1)
, _deferred_window_update(0) {
// Stop printing the field which is useless for remote settings.
_remote_settings.connection_window_size = 0;
if (server) {
_unack_local_settings = server->options().http2_settings;
_unack_local_settings = server->options().h2_settings;
} else {
_unack_local_settings.header_table_size = FLAGS_http2_client_header_table_size;
_unack_local_settings.initial_window_size = FLAGS_http2_client_initial_window_size;
_unack_local_settings.stream_window_size = FLAGS_http2_client_stream_window_size;
_unack_local_settings.max_frame_size = FLAGS_http2_client_max_frame_size;
_unack_local_settings.connection_window_size = FLAGS_http2_client_connection_window_size;
}
#if defined(UNIT_TEST)
// In ut, we hope _last_client_stream_id run out quickly to test the correctness
......@@ -510,8 +560,6 @@ inline uint32_t H2Context::VolatilePendingStreamSize() const {
return _pending_streams.size();
}
const size_t FRAME_HEAD_SIZE = 9;
ParseResult H2Context::ConsumeFrameHead(
butil::IOBufBytesIterator& it, H2FrameHead* frame_head) {
uint8_t length_buf[3];
......@@ -541,26 +589,6 @@ ParseResult H2Context::ConsumeFrameHead(
return MakeMessage(NULL);
}
void SerializeFrameHead(void* out_buf, uint32_t payload_size,
H2FrameType type, uint8_t flags,
uint32_t stream_id) {
uint8_t* p = (uint8_t*)out_buf;
*p++ = (payload_size >> 16) & 0xFF;
*p++ = (payload_size >> 8) & 0xFF;
*p++ = payload_size & 0xFF;
*p++ = (uint8_t)type;
*p++ = flags;
*p++ = (stream_id >> 24) & 0xFF;
*p++ = (stream_id >> 16) & 0xFF;
*p++ = (stream_id >> 8) & 0xFF;
*p++ = stream_id & 0xFF;
}
inline void SerializeFrameHead(void* out_buf, const H2FrameHead& h) {
return SerializeFrameHead(out_buf, h.payload_size, h.type,
h.flags, h.stream_id);
}
ParseResult H2Context::Consume(
butil::IOBufBytesIterator& it, Socket* socket) {
if (_conn_state == H2_CONNECTION_UNINITIALIZED) {
......@@ -576,12 +604,11 @@ ParseResult H2Context::Consume(
}
_conn_state = H2_CONNECTION_READY;
char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE];
const size_t nb = SerializeH2Settings(
_unack_local_settings, settingsbuf + FRAME_HEAD_SIZE);
SerializeFrameHead(settingsbuf, nb, H2_FRAME_SETTINGS, 0, 0);
char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE +
FRAME_HEAD_SIZE + 4/*for WU*/];
const size_t nb = SerializeH2SettingsFrameAndWU(_unack_local_settings, settingsbuf);
butil::IOBuf buf;
buf.append(settingsbuf, FRAME_HEAD_SIZE + nb);
buf.append(settingsbuf, nb);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) {
......@@ -828,27 +855,31 @@ H2ParseResult H2StreamContext::OnData(
}
}
const int64_t acc = _deferred_window_update.fetch_add(frag_size, butil::memory_order_relaxed) + frag_size;
if (acc >= _conn_ctx->local_settings().initial_window_size / 2) {
if (acc >= _conn_ctx->local_settings().stream_window_size / 2) {
// Rarely happen for small messages.
const int64_t stream_wu =
_deferred_window_update.exchange(0, butil::memory_order_relaxed);
const int64_t conn_wu = stream_wu + _conn_ctx->ReleaseDeferredWindowUpdate();
char winbuf[(FRAME_HEAD_SIZE + 4) * 2];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, stream_id());
SaveUint32(winbuf + FRAME_HEAD_SIZE, stream_wu);
char* cwin = winbuf + FRAME_HEAD_SIZE + 4;
SerializeFrameHead(cwin, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwin + FRAME_HEAD_SIZE, conn_wu);
if (stream_wu > 0) {
char winbuf[(FRAME_HEAD_SIZE + 4) * 2];
char* p = winbuf;
SerializeFrameHead(p, 4, H2_FRAME_WINDOW_UPDATE, 0, stream_id());
SaveUint32(p + FRAME_HEAD_SIZE, stream_wu);
p += FRAME_HEAD_SIZE + 4;
const int64_t conn_wu = stream_wu + _conn_ctx->ReleaseDeferredWindowUpdate();
SerializeFrameHead(p, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(p + FRAME_HEAD_SIZE, conn_wu);
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
return MakeH2Error(H2_INTERNAL_ERROR);
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
return MakeH2Error(H2_INTERNAL_ERROR);
}
}
}
if (frame_head.flags & H2_FLAGS_END_STREAM) {
......@@ -947,14 +978,14 @@ H2ParseResult H2Context::OnSettings(
_local_settings = _unack_local_settings;
return MakeH2Message(NULL);
}
const int64_t old_initial_window_size = _remote_settings.initial_window_size;
const int64_t old_stream_window_size = _remote_settings.stream_window_size;
if (!ParseH2Settings(&_remote_settings, it, frame_head.payload_size)) {
LOG(ERROR) << "Fail to parse from SETTINGS";
return MakeH2Error(H2_PROTOCOL_ERROR);
}
const int64_t window_diff =
static_cast<int64_t>(_remote_settings.initial_window_size)
- old_initial_window_size;
static_cast<int64_t>(_remote_settings.stream_window_size)
- old_stream_window_size;
if (window_diff) {
// Do not update the connection flow-control window here, which can only
// be changed using WINDOW_UPDATE frames.
......@@ -1065,15 +1096,17 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
os << '\n';
}
const char sep = (opt.verbose ? '\n' : ' ');
os << "remote_window_left=" << _remote_window_left.load(butil::memory_order_relaxed)
<< sep << "deferred_window_update=" << _deferred_window_update.load(butil::memory_order_relaxed)
<< sep << "conn_state=" << H2ConnectionState2Str(_conn_state);
os << "conn_state=" << H2ConnectionState2Str(_conn_state);
if (is_server_side()) {
os << sep << "last_server_stream_id=" << _last_server_stream_id;
} else {
os << sep << "last_client_stream_id=" << _last_client_stream_id;
}
os << sep << "remote_settings=" << _remote_settings
os << sep << "deferred_window_update="
<< _deferred_window_update.load(butil::memory_order_relaxed)
<< sep << "remote_conn_window_left="
<< _remote_window_left.load(butil::memory_order_relaxed)
<< sep << "remote_settings=" << _remote_settings
<< sep << "local_settings=" << _local_settings
<< sep << "hpacker={";
IndentingOStream os2(os, 2);
......@@ -1103,20 +1136,21 @@ void H2Context::DeferWindowUpdate(int64_t size) {
return;
}
const int64_t acc = _deferred_window_update.fetch_add(size, butil::memory_order_relaxed) + size;
if (acc >= local_settings().initial_window_size / 2) {
if (acc >= local_settings().stream_window_size / 2) {
// Rarely happen for small messages.
const int64_t conn_wu = _deferred_window_update.exchange(0, butil::memory_order_relaxed);
char winbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu);
if (conn_wu > 0) {
char winbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu);
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
}
}
}
}
......@@ -1189,7 +1223,7 @@ void H2Context::ClearAbandonedStreamsImpl() {
void H2StreamContext::Init(H2Context* conn_ctx, int stream_id) {
_conn_ctx = conn_ctx;
_remote_window_left.store(conn_ctx->remote_settings().initial_window_size,
_remote_window_left.store(conn_ctx->remote_settings().stream_window_size,
butil::memory_order_relaxed);
header()._h2_stream_id = stream_id;
}
......@@ -1200,7 +1234,7 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
, _state(H2_STREAM_IDLE)
#endif
, _stream_ended(false)
, _remote_window_left(conn_ctx->remote_settings().initial_window_size)
, _remote_window_left(conn_ctx->remote_settings().stream_window_size)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
......@@ -1515,11 +1549,11 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
out->append(H2_CONNECTION_PREFACE_PREFIX,
H2_CONNECTION_PREFACE_PREFIX_SIZE);
char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE];
const size_t nb = SerializeH2Settings(
ctx->_unack_local_settings, settingsbuf + FRAME_HEAD_SIZE);
SerializeFrameHead(settingsbuf, nb, H2_FRAME_SETTINGS, 0, 0);
out->append(settingsbuf, FRAME_HEAD_SIZE + nb);
char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE +
FRAME_HEAD_SIZE + 4/*for WU*/];
const size_t nb = SerializeH2SettingsFrameAndWU(
ctx->_unack_local_settings, settingsbuf);
out->append(settingsbuf, nb);
}
// FIXME(gejun): Replace EAGAIN
......
......@@ -725,6 +725,11 @@ int Server::StartInternal(const butil::ip_t& ip,
_options = ServerOptions();
}
if (!_options.h2_settings.IsValid(true/*log_error*/)) {
LOG(ERROR) << "Invalid h2_settings";
return -1;
}
if (_options.http_master_service) {
// Check requirements for http_master_service:
// has "default_method" & request/response have no fields
......
......@@ -231,7 +231,7 @@ struct ServerOptions {
std::string enabled_protocols;
// Customize parameters of HTTP2, defined in http2.h
H2Settings http2_settings;
H2Settings h2_settings;
private:
// SSLOptions is large and not often used, allocate it on heap to
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment