Commit 8b0cd6a8 authored by zhujiashun's avatar zhujiashun Committed by gejun

add h2 ut of flow control and settings

parent 69cb2dbd
......@@ -68,11 +68,6 @@ const char* H2StreamState2Str(H2StreamState s) {
return "unknown(H2StreamState)";
}
enum H2ConnectionState {
H2_CONNECTION_UNINITIALIZED,
H2_CONNECTION_READY,
H2_CONNECTION_GOAWAY,
};
static const char* H2ConnectionState2Str(H2ConnectionState s) {
switch (s) {
case H2_CONNECTION_UNINITIALIZED: return "UNINITIALIZED";
......@@ -122,31 +117,6 @@ const uint8_t H2_FLAGS_PRIORITY = 0x20;
#define H2_CONNECTION_PREFACE_PREFIX "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
const size_t H2_CONNECTION_PREFACE_PREFIX_SIZE = 24;
const size_t FRAME_HEAD_SIZE = 9;
// https://tools.ietf.org/html/rfc7540#section-4.1
struct H2FrameHead {
// The length of the frame payload expressed as an unsigned 24-bit integer.
// Values greater than H2Settings.max_frame_size MUST NOT be sent
uint32_t payload_size;
// The 8-bit type of the frame. The frame type determines the format and
// semantics of the frame. Implementations MUST ignore and discard any
// frame that has a type that is unknown.
H2FrameType type;
// An 8-bit field reserved for boolean flags specific to the frame type.
// Flags are assigned semantics specific to the indicated frame type.
// Flags that have no defined semantics for a particular frame type
// MUST be ignored and MUST be left unset (0x0) when sending.
uint8_t flags;
// A stream identifier (see Section 5.1.1) expressed as an unsigned 31-bit
// integer. The value 0x0 is reserved for frames that are associated with
// the connection as a whole as opposed to an individual stream.
int stream_id;
};
void SerializeFrameHead(void* out_buf,
uint32_t payload_size, H2FrameType type,
uint8_t flags, uint32_t stream_id) {
......@@ -167,8 +137,6 @@ inline void SerializeFrameHead(void* out_buf, const H2FrameHead& h) {
h.flags, h.stream_id);
}
static void InitFrameHandlers();
// [ https://tools.ietf.org/html/rfc7540#section-6.5.1 ]
enum H2SettingsIdentifier {
......@@ -287,89 +255,6 @@ static size_t SerializeH2SettingsFrameAndWU(const H2Settings& in, void* out) {
return static_cast<size_t>(p - (uint8_t*)out);
}
// Contexts of a http2 connection
class H2Context : public Destroyable, public Describable {
public:
typedef H2ParseResult (H2Context::*FrameHandler)(
butil::IOBufBytesIterator&, const H2FrameHead&);
// main_socket: the socket owns this object as parsing_context
// server: NULL means client-side
H2Context(Socket* main_socket, const Server* server);
~H2Context();
// Must be called before usage.
int Init();
H2ConnectionState state() const { return _conn_state; }
ParseResult Consume(butil::IOBufBytesIterator& it, Socket*);
void ClearAbandonedStreams();
void AddAbandonedStream(uint32_t stream_id);
//@Destroyable
void Destroy() override { delete this; }
int AllocateClientStreamId();
bool RunOutStreams() const;
// Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
uint32_t VolatilePendingStreamSize() const;
HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; }
const H2Settings& local_settings() const { return _local_settings; }
bool is_client_side() const { return _socket->CreatedByConnect(); }
bool is_server_side() const { return !is_client_side(); }
void Describe(std::ostream& os, const DescribeOptions&) const;
void DeferWindowUpdate(int64_t);
int64_t ReleaseDeferredWindowUpdate();
private:
friend class H2StreamContext;
friend class H2UnsentRequest;
friend class H2UnsentResponse;
friend void InitFrameHandlers();
ParseResult ConsumeFrameHead(butil::IOBufBytesIterator&, H2FrameHead*);
H2ParseResult OnData(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnHeaders(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPriority(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnResetStream(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnSettings(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPushPromise(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPing(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnGoAway(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnWindowUpdate(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const H2FrameHead&);
H2StreamContext* RemoveStream(int stream_id);
H2StreamContext* FindStream(int stream_id);
void ClearAbandonedStreamsImpl();
// True if the connection is established by client, otherwise it's
// accepted by server.
Socket* _socket;
butil::atomic<int64_t> _remote_window_left;
H2ConnectionState _conn_state;
int _last_server_stream_id;
uint32_t _last_client_stream_id;
H2Settings _remote_settings;
H2Settings _local_settings;
H2Settings _unack_local_settings;
HPacker _hpacker;
mutable butil::Mutex _abandoned_streams_mutex;
std::vector<uint32_t> _abandoned_streams;
typedef butil::FlatMap<int, H2StreamContext*> StreamMap;
mutable butil::Mutex _stream_mutex;
StreamMap _pending_streams;
butil::atomic<int64_t> _deferred_window_update;
};
inline bool AddWindowSize(butil::atomic<int64_t>* window_size, int64_t diff) {
// A sender MUST NOT allow a flow-control window to exceed 2^31 - 1.
// If a sender receives a WINDOW_UPDATE that causes a flow-control window
......@@ -406,9 +291,8 @@ inline bool MinusWindowSize(butil::atomic<int64_t>* window_size, int64_t size) {
}
static H2Context::FrameHandler s_frame_handlers[H2_FRAME_TYPE_MAX + 1];
static pthread_once_t s_frame_handlers_init_once = PTHREAD_ONCE_INIT;
static void InitFrameHandlers() {
void InitFrameHandlers() {
s_frame_handlers[H2_FRAME_DATA] = &H2Context::OnData;
s_frame_handlers[H2_FRAME_HEADERS] = &H2Context::OnHeaders;
s_frame_handlers[H2_FRAME_PRIORITY] = &H2Context::OnPriority;
......@@ -447,8 +331,8 @@ H2Context::H2Context(Socket* socket, const Server* server)
}
#if defined(UNIT_TEST)
// In ut, we hope _last_client_stream_id run out quickly to test the correctness
// of creating new h2 socket. This value is 100,000 less than 0x7FFFFFFF.
_last_client_stream_id = 0x7FFE795F;
// of creating new h2 socket. This value is 10,000 less than 0x7FFFFFFF.
_last_client_stream_id = 0x7fffd8ef;
#endif
}
......@@ -1504,8 +1388,8 @@ private:
void H2UnsentRequest::DestroyStreamUserData(SocketUniquePtr& sending_sock,
Controller* cntl,
int error_code,
bool end_of_rpc) {
int /*error_code*/,
bool /*end_of_rpc*/) {
RemoveRefOnQuit deref_self(this);
if (sending_sock != NULL && cntl->ErrorCode() != 0) {
CHECK_EQ(cntl, _cntl);
......
......@@ -64,7 +64,44 @@ inline H2ParseResult MakeH2Message(H2StreamContext* msg)
{ return H2ParseResult(msg); }
class H2Context;
class H2FrameHead;
enum H2FrameType {
H2_FRAME_DATA = 0x0,
H2_FRAME_HEADERS = 0x1,
H2_FRAME_PRIORITY = 0x2,
H2_FRAME_RST_STREAM = 0x3,
H2_FRAME_SETTINGS = 0x4,
H2_FRAME_PUSH_PROMISE = 0x5,
H2_FRAME_PING = 0x6,
H2_FRAME_GOAWAY = 0x7,
H2_FRAME_WINDOW_UPDATE = 0x8,
H2_FRAME_CONTINUATION = 0x9,
// ============================
H2_FRAME_TYPE_MAX = 0x9
};
// https://tools.ietf.org/html/rfc7540#section-4.1
struct H2FrameHead {
// The length of the frame payload expressed as an unsigned 24-bit integer.
// Values greater than H2Settings.max_frame_size MUST NOT be sent
uint32_t payload_size;
// The 8-bit type of the frame. The frame type determines the format and
// semantics of the frame. Implementations MUST ignore and discard any
// frame that has a type that is unknown.
H2FrameType type;
// An 8-bit field reserved for boolean flags specific to the frame type.
// Flags are assigned semantics specific to the indicated frame type.
// Flags that have no defined semantics for a particular frame type
// MUST be ignored and MUST be left unset (0x0) when sending.
uint8_t flags;
// A stream identifier (see Section 5.1.1) expressed as an unsigned 31-bit
// integer. The value 0x0 is reserved for frames that are associated with
// the connection as a whole as opposed to an individual stream.
int stream_id;
};
enum H2StreamState {
H2_STREAM_IDLE = 0,
......@@ -260,26 +297,104 @@ protected:
void DestroyStreamCreator(Controller* cntl) override;
};
enum H2FrameType {
H2_FRAME_DATA = 0x0,
H2_FRAME_HEADERS = 0x1,
H2_FRAME_PRIORITY = 0x2,
H2_FRAME_RST_STREAM = 0x3,
H2_FRAME_SETTINGS = 0x4,
H2_FRAME_PUSH_PROMISE = 0x5,
H2_FRAME_PING = 0x6,
H2_FRAME_GOAWAY = 0x7,
H2_FRAME_WINDOW_UPDATE = 0x8,
H2_FRAME_CONTINUATION = 0x9,
// ============================
H2_FRAME_TYPE_MAX = 0x9
enum H2ConnectionState {
H2_CONNECTION_UNINITIALIZED,
H2_CONNECTION_READY,
H2_CONNECTION_GOAWAY,
};
void SerializeFrameHead(void* out_buf,
uint32_t payload_size, H2FrameType type,
uint8_t flags, uint32_t stream_id);
} // namespace policy
size_t SerializeH2Settings(const H2Settings& in, void* out);
const size_t FRAME_HEAD_SIZE = 9;
// Contexts of a http2 connection
class H2Context : public Destroyable, public Describable {
public:
typedef H2ParseResult (H2Context::*FrameHandler)(
butil::IOBufBytesIterator&, const H2FrameHead&);
// main_socket: the socket owns this object as parsing_context
// server: NULL means client-side
H2Context(Socket* main_socket, const Server* server);
~H2Context();
// Must be called before usage.
int Init();
H2ConnectionState state() const { return _conn_state; }
ParseResult Consume(butil::IOBufBytesIterator& it, Socket*);
void ClearAbandonedStreams();
void AddAbandonedStream(uint32_t stream_id);
//@Destroyable
void Destroy() override { delete this; }
int AllocateClientStreamId();
bool RunOutStreams() const;
// Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
uint32_t VolatilePendingStreamSize() const;
HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; }
const H2Settings& local_settings() const { return _local_settings; }
bool is_client_side() const { return _socket->CreatedByConnect(); }
bool is_server_side() const { return !is_client_side(); }
void Describe(std::ostream& os, const DescribeOptions&) const;
void DeferWindowUpdate(int64_t);
int64_t ReleaseDeferredWindowUpdate();
private:
friend class H2StreamContext;
friend class H2UnsentRequest;
friend class H2UnsentResponse;
friend void InitFrameHandlers();
ParseResult ConsumeFrameHead(butil::IOBufBytesIterator&, H2FrameHead*);
H2ParseResult OnData(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnHeaders(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPriority(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnResetStream(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnSettings(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPushPromise(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnPing(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnGoAway(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnWindowUpdate(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const H2FrameHead&);
H2StreamContext* RemoveStream(int stream_id);
H2StreamContext* FindStream(int stream_id);
void ClearAbandonedStreamsImpl();
// True if the connection is established by client, otherwise it's
// accepted by server.
Socket* _socket;
butil::atomic<int64_t> _remote_window_left;
H2ConnectionState _conn_state;
int _last_server_stream_id;
uint32_t _last_client_stream_id;
H2Settings _remote_settings;
H2Settings _local_settings;
H2Settings _unack_local_settings;
HPacker _hpacker;
mutable butil::Mutex _abandoned_streams_mutex;
std::vector<uint32_t> _abandoned_streams;
typedef butil::FlatMap<int, H2StreamContext*> StreamMap;
mutable butil::Mutex _stream_mutex;
StreamMap _pending_streams;
butil::atomic<int64_t> _deferred_window_update;
};
} // namespace policy
} // namespace brpc
#endif // BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
......@@ -95,6 +95,11 @@ protected:
options.fd = _pipe_fds[1];
EXPECT_EQ(0, brpc::Socket::Create(options, &id));
EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
brpc::SocketOptions h2_client_options;
h2_client_options.user = brpc::get_client_side_messenger();
EXPECT_EQ(0, brpc::Socket::Create(h2_client_options, &id));
EXPECT_EQ(0, brpc::Socket::Address(id, &_h2_client_sock));
};
virtual ~HttpTest() {};
......@@ -177,10 +182,11 @@ protected:
msg->Destroy();
}
void MakeH2EchoRequestBuf(butil::IOBuf* out, brpc::Controller* cntl, brpc::Socket* socket, int* h2_stream_id) {
void MakeH2EchoRequestBuf(butil::IOBuf* out, brpc::Controller* cntl, int* h2_stream_id) {
butil::IOBuf request_buf;
test::EchoRequest req;
req.set_message(EXP_REQUEST);
cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
brpc::policy::SerializeHttpRequest(&request_buf, cntl, &req);
ASSERT_FALSE(cntl->Failed());
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(cntl);
......@@ -188,28 +194,30 @@ protected:
brpc::SocketMessage* socket_message = NULL;
brpc::policy::PackH2Request(NULL, &socket_message, cntl->call_id().value,
NULL, cntl, request_buf, NULL);
butil::Status st = socket_message->AppendAndDestroySelf(out, socket);
butil::Status st = socket_message->AppendAndDestroySelf(out, _h2_client_sock.get());
ASSERT_TRUE(st.ok());
*h2_stream_id = h2_req->_stream_id;
h2_req->DestroyStreamUserData(_h2_client_sock, cntl, 0, false);
}
void MakeH2EchoResponseBuf(butil::IOBuf* out, brpc::Socket* socket, int h2_stream_id) {
void MakeH2EchoResponseBuf(butil::IOBuf* out, int h2_stream_id) {
brpc::Controller cntl;
test::EchoResponse res;
res.set_message(EXP_RESPONSE);
cntl.http_request()._h2_stream_id = h2_stream_id;
cntl.http_request().set_content_type("application/proto");
res.set_message(EXP_RESPONSE);
{
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl.response_attachment());
EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
}
brpc::policy::H2UnsentResponse* h2_res = brpc::policy::H2UnsentResponse::New(&cntl);
butil::Status st = h2_res->AppendAndDestroySelf(out, socket);
butil::Status st = h2_res->AppendAndDestroySelf(out, _h2_client_sock.get());
ASSERT_TRUE(st.ok());
}
int _pipe_fds[2];
brpc::SocketUniquePtr _socket;
brpc::SocketUniquePtr _h2_client_sock;
brpc::Server _server;
MyEchoService _svc;
......@@ -672,7 +680,7 @@ TEST_F(HttpTest, read_long_body_progressively) {
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
options.protocol = brpc::PROTOCOL_HTTP2;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
......@@ -965,10 +973,14 @@ TEST_F(HttpTest, http2_sanity) {
options.protocol = "h2c";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
// 1) complete flow and
// 2) socket replacement when streamId runs out, the initial streamId is a special
// value set in ctor of H2Context
test::EchoRequest req;
req.set_message(EXP_REQUEST);
test::EchoResponse res;
for (int i = 0; i < 200000; ++i) {
for (int i = 0; i < 15000; ++i) {
brpc::Controller cntl;
cntl.http_request().set_content_type("application/json");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
......@@ -977,36 +989,37 @@ TEST_F(HttpTest, http2_sanity) {
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
}
// check connection window size
brpc::SocketUniquePtr main_ptr;
brpc::SocketUniquePtr agent_ptr;
EXPECT_EQ(brpc::Socket::Address(channel._server_id, &main_ptr), 0);
EXPECT_EQ(main_ptr->GetAgentSocket(&agent_ptr, NULL), 0);
brpc::policy::H2Context* ctx = static_cast<brpc::policy::H2Context*>(agent_ptr->parsing_context());
ASSERT_GT(ctx->_remote_window_left.load(butil::memory_order_relaxed),
brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / 2);
}
TEST_F(HttpTest, http2_ping) {
// This test injects PING frames before and after header and data.
brpc::Controller cntl;
brpc::SocketUniquePtr client_sock;
brpc::SocketId id;
brpc::SocketOptions options;
options.user = brpc::get_client_side_messenger();
EXPECT_EQ(0, brpc::Socket::Create(options, &id));
ASSERT_EQ(0, brpc::Socket::Address(id, &client_sock));
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, client_sock.get(), &h2_stream_id);
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
MakeH2EchoResponseBuf(&res_out, client_sock.get(), h2_stream_id);
char pingbuf[9 /*FRAME_HEAD_SIZE*/ + 8 /*Opaque Data*/];
brpc::policy::SerializeFrameHead(pingbuf, 8, brpc::policy::H2_FRAME_PING, 0, 0);
butil::IOBuf total_buf;
// insert ping before header and data
total_buf.append(pingbuf, sizeof(pingbuf));
total_buf.append(res_out);
res_out.append(pingbuf, sizeof(pingbuf));
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
// insert ping after header and data
total_buf.append(pingbuf, sizeof(pingbuf));
res_out.append(pingbuf, sizeof(pingbuf));
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&total_buf, client_sock.get(), false, NULL);
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
......@@ -1023,29 +1036,20 @@ inline void SaveUint32(void* out, uint32_t v) {
TEST_F(HttpTest, http2_rst_before_header) {
brpc::Controller cntl;
brpc::SocketUniquePtr client_sock;
brpc::SocketId id;
brpc::SocketOptions options;
options.user = brpc::get_client_side_messenger();
EXPECT_EQ(0, brpc::Socket::Create(options, &id));
ASSERT_EQ(0, brpc::Socket::Address(id, &client_sock));
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, client_sock.get(), &h2_stream_id);
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
MakeH2EchoResponseBuf(&res_out, client_sock.get(), h2_stream_id);
char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
butil::IOBuf total_buf;
total_buf.append(rstbuf, sizeof(rstbuf));
total_buf.append(res_out);
res_out.append(rstbuf, sizeof(rstbuf));
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&total_buf, client_sock.get(), false, NULL);
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
......@@ -1056,29 +1060,20 @@ TEST_F(HttpTest, http2_rst_before_header) {
TEST_F(HttpTest, http2_rst_after_header_and_data) {
brpc::Controller cntl;
brpc::SocketUniquePtr client_sock;
brpc::SocketId id;
brpc::SocketOptions options;
options.user = brpc::get_client_side_messenger();
EXPECT_EQ(0, brpc::Socket::Create(options, &id));
ASSERT_EQ(0, brpc::Socket::Address(id, &client_sock));
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, client_sock.get(), &h2_stream_id);
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
MakeH2EchoResponseBuf(&res_out, client_sock.get(), h2_stream_id);
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
butil::IOBuf total_buf;
total_buf.append(res_out);
total_buf.append(rstbuf, sizeof(rstbuf));
res_out.append(rstbuf, sizeof(rstbuf));
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&total_buf, client_sock.get(), false, NULL);
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
......@@ -1086,4 +1081,75 @@ TEST_F(HttpTest, http2_rst_after_header_and_data) {
ASSERT_TRUE(cntl.http_response().status_code() == brpc::HTTP_STATUS_OK);
}
TEST_F(HttpTest, http2_window_used_up) {
brpc::Controller cntl;
butil::IOBuf request_buf;
test::EchoRequest req;
// longer message to trigger using up window size sooner
req.set_message("FLOW_CONTROL_FLOW_CONTROL");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().set_content_type("application/proto");
brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size();
for (int i = 0; i <= nsuc; i++) {
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
cntl._current_call.stream_user_data = h2_req;
brpc::SocketMessage* socket_message = NULL;
brpc::policy::PackH2Request(NULL, &socket_message, cntl.call_id().value,
NULL, &cntl, request_buf, NULL);
butil::IOBuf dummy;
butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get());
if (i == nsuc) {
// the last message should fail according to flow control policy.
ASSERT_FALSE(st.ok());
ASSERT_TRUE(st.error_code() == EAGAIN);
ASSERT_TRUE(butil::StringPiece(st.error_str()).starts_with("remote_window_left is not enough"));
} else {
ASSERT_TRUE(st.ok());
}
h2_req->DestroyStreamUserData(_h2_client_sock, &cntl, 0, false);
}
}
TEST_F(HttpTest, http2_settings) {
char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
brpc::H2Settings h2_settings;
h2_settings.header_table_size = 8192;
h2_settings.max_concurrent_streams = 1024;
h2_settings.stream_window_size= (1u << 29) - 1;
const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
brpc::policy::H2Context* ctx = new brpc::policy::H2Context(_socket.get(), NULL);
CHECK_EQ(ctx->Init(), 0);
_socket->initialize_parsing_context(&ctx);
ctx->_conn_state = brpc::policy::H2_CONNECTION_READY;
// parse settings
brpc::policy::ParseH2Message(&buf, _socket.get(), false, NULL);
butil::IOPortal response_buf;
CHECK_EQ(response_buf.append_from_file_descriptor(_pipe_fds[0], 1024), (ssize_t)brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::H2FrameHead frame_head;
butil::IOBufBytesIterator it(response_buf);
ctx->ConsumeFrameHead(it, &frame_head);
CHECK_EQ(frame_head.type, brpc::policy::H2_FRAME_SETTINGS);
CHECK_EQ(frame_head.flags, 0x01 /* H2_FLAGS_ACK */);
CHECK_EQ(frame_head.stream_id, 0);
ASSERT_TRUE(ctx->_remote_settings.header_table_size == 8192);
ASSERT_TRUE(ctx->_remote_settings.max_concurrent_streams == 1024);
ASSERT_TRUE(ctx->_remote_settings.stream_window_size == (1u << 29) - 1);
}
TEST_F(HttpTest, http2_invalid_settings) {
}
TEST_F(HttpTest, http2_client_not_close_socket_when_timeout) {
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment