Commit ddcb0749 authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: use _ninflight_app_level_health_check to solve race

parent 3ab2b2d6
......@@ -558,7 +558,7 @@ int Channel::CheckHealth() {
SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0 &&
!ptr->IsLogOff() &&
!ptr->IsAppLevelHealthChecking()) {
!ptr->IsAppLevelHealthCheck()) {
return 0;
}
return -1;
......
......@@ -987,7 +987,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff() ||
(!is_health_check_call() && tmp_sock->IsAppLevelHealthChecking())) {
(!is_health_check_call() && tmp_sock->IsAppLevelHealthCheck())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
......
......@@ -222,7 +222,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
return 0;
} else {
if (++choice == s->end()) {
......
......@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[i].id;
if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, &ptrs[nptr].first) == 0
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
int w = schan::GetSubChannelWeight(ptrs[nptr].first->user());
total_weight += w;
if (nptr < 8) {
......
......@@ -304,7 +304,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
}
} else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
......@@ -119,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
// We found an available server
return 0;
}
......
......@@ -123,7 +123,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
s.tls() = tls;
return 0;
}
......
......@@ -181,7 +181,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
&& !(*out->ptr)->IsAppLevelHealthCheck()) {
// update tls.
tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position;
......
......@@ -483,7 +483,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
, _app_level_health_checking(false)
, _ninflight_app_level_health_check(0)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
......@@ -666,7 +666,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_app_level_health_checking.store(false, butil::memory_order_relaxed);
m->_ninflight_app_level_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
......@@ -758,7 +758,6 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q->clear();
}
}
_app_level_health_checking.store(!FLAGS_health_check_path.empty(), butil::memory_order_relaxed);
return 0;
}
......@@ -1014,14 +1013,15 @@ public:
void Run() {
std::unique_ptr<OnHealthCheckRPCDone> self_guard(this);
SocketUniquePtr ptr;
const int rc = Socket::Address(id, &ptr);
if (rc != 0) {
// If the socket is failed, Socket::SetFailed() will
// trigger next round of hc, just return here.
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
return;
}
if (!cntl.Failed()) {
ptr->ResetAppLevelHealthChecking();
if (!cntl.Failed() || ptr->Failed()) {
ptr->_ninflight_app_level_health_check.fetch_sub(
1, butil::memory_order_relaxed);
return;
}
RPC_VLOG << "Fail to health check using rpc, error="
......@@ -1058,7 +1058,8 @@ public:
options.timeout_ms = FLAGS_health_check_timeout_ms;
if (done->channel.Init(id, &options) != 0) {
LOG(WARNING) << "Fail to init health check channel to SocketId=" << id;
ptr->ResetAppLevelHealthChecking();
ptr->_ninflight_app_level_health_check.fetch_sub(
1, butil::memory_order_relaxed);
delete done;
return;
}
......@@ -1113,9 +1114,13 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
if (ptr->CreatedByConnect()) {
s_vars->channel_conn << -1;
}
if (!FLAGS_health_check_path.empty()) {
ptr->_ninflight_app_level_health_check.fetch_add(
1, butil::memory_order_relaxed);
}
ptr->Revive();
ptr->_hc_count = 0;
if (ptr->IsAppLevelHealthChecking()) {
if (ptr->IsAppLevelHealthCheck()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
}
return false;
......@@ -2298,8 +2303,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed)
<< "\napp_level_health_checking="
<< ptr->_app_level_health_checking.load(butil::memory_order_relaxed)
<< "\nninflight_app_level_health_check="
<< ptr->_ninflight_app_level_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
......
......@@ -354,9 +354,7 @@ public:
// Check Whether the state is in app level health checking state or
// not, which means this socket would not be selected in further
// user request until app level check succeed.
bool IsAppLevelHealthChecking() const;
// Reset health check state to the initial state(which is false)
void ResetAppLevelHealthChecking();
bool IsAppLevelHealthCheck() const;
// Start to process edge-triggered events from the fd.
// This function does not block caller.
......@@ -800,9 +798,7 @@ private:
butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
// If this flag is set, socket is now in health check state using
// application-level rpc.
butil::atomic<bool> _app_level_health_checking;
butil::atomic<int64_t> _ninflight_app_level_health_check;
};
} // namespace brpc
......
......@@ -245,12 +245,8 @@ inline bool Socket::IsLogOff() const {
return _logoff_flag.load(butil::memory_order_relaxed);
}
inline bool Socket::IsAppLevelHealthChecking() const {
return _app_level_health_checking.load(butil::memory_order_relaxed);
}
inline void Socket::ResetAppLevelHealthChecking() {
_app_level_health_checking.store(false, butil::memory_order_relaxed);
inline bool Socket::IsAppLevelHealthCheck() const {
return (_ninflight_app_level_health_check.load(butil::memory_order_relaxed) != 0);
}
static const uint32_t EOF_FLAG = (1 << 31);
......
......@@ -696,7 +696,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
brpc::ExcludedServers::Destroy(exclude);
}
TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
TEST_F(LoadBalancerTest, health_check_no_valid_server) {
const char* servers[] = {
"10.92.115.19:8832",
"10.42.122.201:8833",
......@@ -732,18 +732,18 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->_app_level_health_checking.store(true, butil::memory_order_relaxed);
ptr->_ninflight_app_level_health_check.store(1, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
// After putting server[0] into health checking state, the only choice is servers[1]
// After putting server[0] into health check state, the only choice is servers[1]
ASSERT_EQ(ptr->remote_side().port, 8833);
}
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->_app_level_health_checking.store(true, butil::memory_order_relaxed);
ptr->_ninflight_app_level_health_check.store(1, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
......@@ -753,10 +753,10 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
}
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->ResetAppLevelHealthChecking();
ptr->_ninflight_app_level_health_check.store(0, butil::memory_order_relaxed);
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->ResetAppLevelHealthChecking();
// After reset health checking state, the lb should work fine
ptr->_ninflight_app_level_health_check.store(0, butil::memory_order_relaxed);
// After reset health check state, the lb should work fine
bool get_server1 = false;
bool get_server2 = false;
for (int i = 0; i < 20; ++i) {
......
......@@ -546,7 +546,7 @@ public:
brpc::Controller* cntl = (brpc::Controller*)cntl_base;
if (_sleep_flag) {
bthread_usleep(510000 /* 510ms, a little bit longer than the default
timeout of health checking rpc */);
timeout of health check rpc */);
}
cntl->response_attachment().append("OK");
}
......@@ -554,7 +554,7 @@ public:
bool _sleep_flag;
};
TEST_F(SocketTest, app_level_health_checking) {
TEST_F(SocketTest, app_level_health_check) {
int old_health_check_interval = brpc::FLAGS_health_check_interval;
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
......@@ -591,7 +591,7 @@ TEST_F(SocketTest, app_level_health_checking) {
for (int i = 0; i < 4; ++i) {
// although ::connect would succeed, the stall in hc_service makes
// the health checking rpc fail.
// the health check rpc fail.
brpc::Controller cntl;
cntl.http_request().uri() = "/";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment