Commit d6ebf1d2 authored by zhujiashun's avatar zhujiashun Committed by gejun

add h2 sanity check ut

parent c622e64a
// Copyright (c) 2015 Baidu, Inc. // Copyright (c) 2014 Baidu, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
// Jiashun Zhu(zhujiashun@baidu.com)
#include "brpc/policy/http2_rpc_protocol.h" #include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/details/controller_private_accessor.h" #include "brpc/details/controller_private_accessor.h"
#include "brpc/server.h" #include "brpc/server.h"
...@@ -284,6 +287,11 @@ H2Context::H2Context(Socket* socket, const Server* server) ...@@ -284,6 +287,11 @@ H2Context::H2Context(Socket* socket, const Server* server)
_unack_local_settings.initial_window_size = FLAGS_http2_client_initial_window_size; _unack_local_settings.initial_window_size = FLAGS_http2_client_initial_window_size;
_unack_local_settings.max_frame_size = FLAGS_http2_client_max_frame_size; _unack_local_settings.max_frame_size = FLAGS_http2_client_max_frame_size;
} }
#if defined(UNIT_TEST)
// In ut, we hope _last_client_stream_id run out quickly to test the correctness
// of creating new h2 socket
_last_client_stream_id = 0x7FFE795F;
#endif
} }
H2Context::~H2Context() { H2Context::~H2Context() {
...@@ -1528,12 +1536,17 @@ void PackH2Request(butil::IOBuf*, ...@@ -1528,12 +1536,17 @@ void PackH2Request(butil::IOBuf*,
void H2GlobalStreamCreator::ReplaceSocketForStream( void H2GlobalStreamCreator::ReplaceSocketForStream(
SocketUniquePtr* inout, Controller* cntl) { SocketUniquePtr* inout, Controller* cntl) {
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
do { do {
if (!(*inout)->_agent_socket) { if (!(*inout)->_agent_socket) {
break; break;
} }
H2Context* ctx = static_cast<H2Context*>((*inout)->_agent_socket->parsing_context()); H2Context* ctx = static_cast<H2Context*>((*inout)->_agent_socket->parsing_context());
// According to https://httpwg.org/specs/rfc7540.html#StreamIdentifiers:
// A client that is unable to establish a new stream identifier can establish
// a new connection for new streams.
if (ctx && ctx->RunOutStreams()) { if (ctx && ctx->RunOutStreams()) {
break; break;
} }
...@@ -1541,12 +1554,12 @@ void H2GlobalStreamCreator::ReplaceSocketForStream( ...@@ -1541,12 +1554,12 @@ void H2GlobalStreamCreator::ReplaceSocketForStream(
return; return;
} while (0); } while (0);
LOG(INFO) << "Ready to create h2 agent socket";
SocketId sid; SocketId sid;
SocketOptions opt = (*inout)->_options; SocketOptions opt = (*inout)->_options;
// Only main socket can be the owner of ssl_ctx // Only main socket can be the owner of ssl_ctx
opt.owns_ssl_ctx = false; opt.owns_ssl_ctx = false;
opt.health_check_interval_s = -1; opt.health_check_interval_s = -1;
// TODO(zhujiashun): Predictively create socket to improve performance
if (get_client_side_messenger()->Create(opt, &sid) != 0) { if (get_client_side_messenger()->Create(opt, &sid) != 0) {
cntl->SetFailed(EINVAL, "Fail to create H2 socket"); cntl->SetFailed(EINVAL, "Fail to create H2 socket");
return; return;
...@@ -1557,19 +1570,22 @@ void H2GlobalStreamCreator::ReplaceSocketForStream( ...@@ -1557,19 +1570,22 @@ void H2GlobalStreamCreator::ReplaceSocketForStream(
return; return;
} }
(*inout)->_agent_socket.swap(tmp_ptr); (*inout)->_agent_socket.swap(tmp_ptr);
mu.unlock();
(*inout)->_agent_socket->ReAddress(inout); (*inout)->_agent_socket->ReAddress(inout);
if (tmp_ptr) {
tmp_ptr->ReleaseAdditionalReference();
}
return; return;
} }
void H2GlobalStreamCreator::OnStreamCreationDone( void H2GlobalStreamCreator::OnStreamCreationDone(
SocketUniquePtr& sending_sock, Controller* cntl) { SocketUniquePtr& sending_sock, Controller* cntl) {
// TODO(zhujiashun): when server can not be connected, this can happen. // If any error happens during the time of sending rpc, this function
CHECK(false) << "Never run"; // would be called. Currently just do nothing.
} }
void H2GlobalStreamCreator::CleanupSocketForStream( void H2GlobalStreamCreator::CleanupSocketForStream(
Socket* prev_sock, Controller* cntl, int error_code) { Socket* prev_sock, Controller* cntl, int error_code) {
CHECK(false) << "Never run";
} }
StreamCreator* get_h2_global_stream_creator() { StreamCreator* get_h2_global_stream_creator() {
......
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
// Jiashun Zhu(zhujiashun@baidu.com)
#ifndef BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H #ifndef BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
#define BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H #define BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
......
...@@ -1061,7 +1061,10 @@ void Socket::OnRecycle() { ...@@ -1061,7 +1061,10 @@ void Socket::OnRecycle() {
delete _stream_set; delete _stream_set;
_stream_set = NULL; _stream_set = NULL;
_agent_socket.reset(NULL); if (_agent_socket) {
_agent_socket->ReleaseAdditionalReference();
_agent_socket.reset(NULL);
}
s_vars->nsocket << -1; s_vars->nsocket << -1;
} }
......
...@@ -921,4 +921,30 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) { ...@@ -921,4 +921,30 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) {
ASSERT_TRUE(reader->destroyed()); ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code()); ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
} }
TEST_F(HttpTest, http2_sanity) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "h2c";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
test::EchoRequest req;
req.set_message(EXP_REQUEST);
test::EchoResponse res;
int log_duration = 10000;
for (int i = 0; i < 200000; ++i) {
brpc::Controller cntl;
cntl.http_request().set_content_type("application/json");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
}
}
} //namespace } //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment