Commit 75a80db8 authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: misc change & update docs

parent d5bc479e
......@@ -242,7 +242,7 @@ locality-aware,优先选择延时低的下游,直到其延时高于其他机
| ------------------------- | ----- | ---------------------------------------- | ----------------------- |
| health_check_interval (R) | 3 | seconds between consecutive health-checkings | src/brpc/socket_map.cpp |
在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,协议是Http,只有当Server返回200时,这个server才算恢复,可以通过-health\_check\_using\_rpc=true来打开这个功能,-health\_check\_path设置访问的路径(默认访问brpc自带的/health接口),-health\_check\_timeout\_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。
在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,协议是Http,只有当Server返回200时,这个server才算恢复,可以通过设置-health\_check\_path来打开这个功能(如果下游也是brpc,推荐设置成/health,服务健康的话会返回200),-health\_check\_timeout\_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。
# 发起访问
......
......@@ -556,14 +556,15 @@ int Channel::Weight() {
int Channel::CheckHealth() {
if (_lb == NULL) {
SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0 && !ptr->IsLogOff() &&
if (Socket::Address(_server_id, &ptr) == 0 &&
!ptr->IsLogOff() &&
!ptr->IsHealthCheckingUsingRPC()) {
return 0;
}
return -1;
} else {
SocketUniquePtr tmp_sock;
LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL, false};
LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
LoadBalancer::SelectOut sel_out(&tmp_sock);
return _lb->SelectServer(sel_in, &sel_out);
}
......
......@@ -998,7 +998,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
} else {
LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true, has_request_code(),
_request_code, _accessed, health_check_call};
_request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) {
......
......@@ -118,6 +118,8 @@ friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
friend void policy::ProcessMongoRequest(InputMessageBase*);
friend void policy::ProcessThriftRequest(InputMessageBase*);
friend class OnHealthCheckRPCDone;
friend class HealthCheckManager;
// << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
......@@ -325,12 +327,6 @@ public:
bool is_done_allowed_to_run_in_place() const
{ return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
// Tell RPC that this particular call is used to do health check. These two
// functions is used by the developers of brpc and should not be touched or
// called by users.
void set_health_check_call(bool f) { set_flag(FLAGS_HEALTH_CHECK_CALL, f); }
bool has_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
// ------------------------------------------------------------------------
// Server-side methods.
// These calls shall be made from the server side only. Their results are
......@@ -590,6 +586,11 @@ private:
CallId id = { _correlation_id.value + nretry + 1 };
return id;
}
// Tell RPC that this particular call is used to do health check.
void set_health_check_call(bool f) { set_flag(FLAGS_HEALTH_CHECK_CALL, f); }
bool has_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
public:
CallId current_id() const {
CallId id = { _correlation_id.value + _current_call.nretry + 1 };
......
......@@ -40,7 +40,6 @@ public:
bool has_request_code;
uint64_t request_code;
const ExcludedServers* excluded;
bool health_check_call;
};
struct SelectOut {
......
......@@ -222,7 +222,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
return 0;
} else {
if (++choice == s->end()) {
......
......@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[i].id;
if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, &ptrs[nptr].first) == 0
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
int w = schan::GetSubChannelWeight(ptrs[nptr].first->user());
total_weight += w;
if (nptr < 8) {
......
......@@ -304,7 +304,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
}
} else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
......@@ -119,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
// We found an available server
return 0;
}
......
......@@ -123,7 +123,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
s.tls() = tls;
return 0;
}
......
......@@ -181,7 +181,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& (in.health_check_call || !(*out->ptr)->IsHealthCheckingUsingRPC())) {
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
// update tls.
tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position;
......
......@@ -290,8 +290,7 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
true,
_main_cntl->has_request_code(),
_main_cntl->_request_code,
_main_cntl->_accessed,
false };
_main_cntl->_accessed };
ChannelBalancer::SelectOut sel_out;
const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get())
->SelectChannel(sel_in, &sel_out);
......
......@@ -95,11 +95,11 @@ DEFINE_int32(connect_timeout_as_unreachable, 3,
"times *continuously*, the error is changed to ENETUNREACH which "
"fails the main socket as well when this socket is pooled.");
DEFINE_bool(health_check_using_rpc, false, "By default health check succeeds if server"
"can be connected. If this flag is set, health check is completed not only"
"when server can be connected but also an additional http call succeeds"
"indicated by FLAGS_health_check_path and FLAGS_health_check_timeout_ms");
DEFINE_string(health_check_path, "/health", "Http path of health check call");
DEFINE_string(health_check_path, "", "Http path of health check call."
"By default health check succeeds if server can be connected. If this"
"flag is set, health check is completed not only when server can be"
"connected but also an additional http call succeeds indicated by this"
"flag and FLAGS_health_check_timeout_ms");
DEFINE_int32(health_check_timeout_ms, 500, "Timeout of health check call");
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
......@@ -793,7 +793,7 @@ void Socket::Revive() {
} else {
LOG(INFO) << "Revived " << *this;
}
if (FLAGS_health_check_using_rpc) {
if (!FLAGS_health_check_path.empty()) {
_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
}
return;
......
......@@ -205,7 +205,7 @@ void* select_server(void* arg) {
brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
uint32_t rand_seed = rand();
if (sa->hash) {
......@@ -259,7 +259,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
// Accessing empty lb should result in error.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
......@@ -555,7 +555,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
const size_t SELECT_TIMES = 1000000;
std::map<butil::EndPoint, size_t> times;
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
::brpc::LoadBalancer::SelectOut out(&ptr);
for (size_t i = 0; i < SELECT_TIMES; ++i) {
in.has_request_code = true;
......@@ -632,7 +632,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// consistent with weight configured.
std::map<butil::EndPoint, size_t> select_result;
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
int total_weight = 12;
std::vector<butil::EndPoint> select_servers;
......@@ -690,15 +690,13 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
// The first socket is excluded. The second socket is logfoff.
// The third socket is invalid.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
brpc::LoadBalancer::SelectOut out(&ptr);
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
brpc::ExcludedServers::Destroy(exclude);
}
TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
// If socket is revived and FLAGS_health_check_using_rpc is set,
// this socket should not be selected.
const char* servers[] = {
"10.92.115.19:8832",
"10.42.122.201:8833",
......@@ -727,7 +725,7 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
// Without setting anything, the lb should work fine
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
}
......@@ -737,7 +735,7 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
// After putting server[0] into health checking state, the only choice is servers[1]
......@@ -748,19 +746,22 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
// There is no server available
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
}
// set health_check_call to true, the lb should work fine
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
// After reset health checking state, the lb should work fine
bool get_server1 = false;
bool get_server2 = false;
// The probability of 20 consecutive same server is 1 / (2^19)
bool get_server2 = false;
for (int i = 0; i < 20; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, true };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
if (ptr->remote_side().port == 8832) {
......@@ -770,19 +771,6 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
}
}
ASSERT_TRUE(get_server1 && get_server2);
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
// After reset health checking state, the lb should work fine
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, false };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
}
delete lb;
}
}
......
......@@ -53,7 +53,7 @@ TEST_F(NamingServiceFilterTest, sanity) {
ASSERT_EQ(0, butil::hostname2endpoint("10.128.0.1:1234", &ep));
for (int i = 0; i < 10; ++i) {
brpc::SocketUniquePtr tmp_sock;
brpc::LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL, false };
brpc::LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
brpc::LoadBalancer::SelectOut sel_out(&tmp_sock);
ASSERT_EQ(0, channel._lb->SelectServer(sel_in, &sel_out));
ASSERT_EQ(ep, tmp_sock->remote_side());
......
......@@ -556,7 +556,6 @@ public:
TEST_F(SocketTest, health_check_using_rpc) {
int old_health_check_interval = brpc::FLAGS_health_check_interval;
GFLAGS_NS::SetCommandLineOption("health_check_using_rpc", "true");
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
......@@ -610,7 +609,7 @@ TEST_F(SocketTest, health_check_using_rpc) {
ASSERT_GT(cntl.response_attachment().size(), (size_t)0);
}
GFLAGS_NS::SetCommandLineOption("health_check_using_rpc", "false");
GFLAGS_NS::SetCommandLineOption("health_check_path", "");
char hc_buf[8];
snprintf(hc_buf, sizeof(hc_buf), "%d", old_health_check_interval);
GFLAGS_NS::SetCommandLineOption("health_check_interval", hc_buf);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment