Commit 3ab2b2d6 authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: change _health_checking_using_rpc to…

health_check_using_rpc: change _health_checking_using_rpc to _app_level_health_checking & use Address in OnHealthCheckRPCDone
parent 9d5b1366
......@@ -558,7 +558,7 @@ int Channel::CheckHealth() {
SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0 &&
!ptr->IsLogOff() &&
!ptr->IsHealthCheckingUsingRPC()) {
!ptr->IsAppLevelHealthChecking()) {
return 0;
}
return -1;
......
......@@ -987,7 +987,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff() ||
(!is_health_check_call() && tmp_sock->IsHealthCheckingUsingRPC())) {
(!is_health_check_call() && tmp_sock->IsAppLevelHealthChecking())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
......
......@@ -222,7 +222,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
return 0;
} else {
if (++choice == s->end()) {
......
......@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[i].id;
if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, &ptrs[nptr].first) == 0
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
int w = schan::GetSubChannelWeight(ptrs[nptr].first->user());
total_weight += w;
if (nptr < 8) {
......
......@@ -304,7 +304,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
}
} else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
......@@ -119,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
// We found an available server
return 0;
}
......
......@@ -123,7 +123,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
s.tls() = tls;
return 0;
}
......
......@@ -181,7 +181,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsHealthCheckingUsingRPC()) {
&& !(*out->ptr)->IsAppLevelHealthChecking()) {
// update tls.
tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position;
......
......@@ -483,7 +483,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
, _health_checking_using_rpc(false)
, _app_level_health_checking(false)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
......@@ -666,7 +666,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
m->_app_level_health_checking.store(false, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
......@@ -758,7 +758,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q->clear();
}
}
_health_checking_using_rpc.store(!FLAGS_health_check_path.empty(), butil::memory_order_relaxed);
_app_level_health_checking.store(!FLAGS_health_check_path.empty(), butil::memory_order_relaxed);
return 0;
}
......@@ -1014,24 +1014,16 @@ public:
void Run() {
std::unique_ptr<OnHealthCheckRPCDone> self_guard(this);
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
const int rc = Socket::Address(id, &ptr);
if (rc != 0) {
// If the socket is failed, Socket::SetFailed() will
// trigger next round of hc, just return here.
return;
}
if (!cntl.Failed()) {
ptr->ResetHealthCheckingUsingRPC();
return;
}
// Socket::SetFailed() will trigger next round of hc, just
// return here.
if (cntl.Failed() && ptr->Failed()) {
ptr->ResetAppLevelHealthChecking();
return;
}
// the left case is cntl.Failed() && !ptr->Failed(),
// in which we should retry hc rpc.
RPC_VLOG << "Fail to health check using rpc, error="
<< cntl.ErrorText();
bthread_usleep(interval_s * 1000000);
......@@ -1066,7 +1058,7 @@ public:
options.timeout_ms = FLAGS_health_check_timeout_ms;
if (done->channel.Init(id, &options) != 0) {
LOG(WARNING) << "Fail to init health check channel to SocketId=" << id;
ptr->ResetHealthCheckingUsingRPC();
ptr->ResetAppLevelHealthChecking();
delete done;
return;
}
......@@ -1123,7 +1115,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
}
ptr->Revive();
ptr->_hc_count = 0;
if (ptr->IsHealthCheckingUsingRPC()) {
if (ptr->IsAppLevelHealthChecking()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
}
return false;
......@@ -2306,8 +2298,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed)
<< "\nhealth_checking_using_rpc="
<< ptr->_health_checking_using_rpc.load(butil::memory_order_relaxed)
<< "\napp_level_health_checking="
<< ptr->_app_level_health_checking.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
......
......@@ -351,13 +351,12 @@ public:
void SetLogOff();
bool IsLogOff() const;
// Check Whether the state is in health check using rpc state or
// Check Whether the state is in app level health checking state or
// not, which means this socket would not be selected in further
// user request until rpc succeed and can only be used by health
// check rpc call.
bool IsHealthCheckingUsingRPC() const;
// user request until app level check succeed.
bool IsAppLevelHealthChecking() const;
// Reset health check state to the initial state(which is false)
void ResetHealthCheckingUsingRPC();
void ResetAppLevelHealthChecking();
// Start to process edge-triggered events from the fd.
// This function does not block caller.
......@@ -803,7 +802,7 @@ private:
// If this flag is set, socket is now in health check state using
// application-level rpc.
butil::atomic<bool> _health_checking_using_rpc;
butil::atomic<bool> _app_level_health_checking;
};
} // namespace brpc
......
......@@ -245,12 +245,12 @@ inline bool Socket::IsLogOff() const {
return _logoff_flag.load(butil::memory_order_relaxed);
}
inline bool Socket::IsHealthCheckingUsingRPC() const {
return _health_checking_using_rpc.load(butil::memory_order_relaxed);
inline bool Socket::IsAppLevelHealthChecking() const {
return _app_level_health_checking.load(butil::memory_order_relaxed);
}
inline void Socket::ResetHealthCheckingUsingRPC() {
_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
inline void Socket::ResetAppLevelHealthChecking() {
_app_level_health_checking.store(false, butil::memory_order_relaxed);
}
static const uint32_t EOF_FLAG = (1 << 31);
......
......@@ -732,7 +732,7 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
ptr->_app_level_health_checking.store(true, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
......@@ -743,7 +743,7 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
}
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
ptr->_app_level_health_checking.store(true, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
......@@ -753,9 +753,9 @@ TEST_F(LoadBalancerTest, health_checking_no_valid_server) {
}
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
ptr->ResetAppLevelHealthChecking();
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->ResetHealthCheckingUsingRPC();
ptr->ResetAppLevelHealthChecking();
// After reset health checking state, the lb should work fine
bool get_server1 = false;
bool get_server2 = false;
......
......@@ -554,7 +554,7 @@ public:
bool _sleep_flag;
};
TEST_F(SocketTest, health_check_using_rpc) {
TEST_F(SocketTest, app_level_health_checking) {
int old_health_check_interval = brpc::FLAGS_health_check_interval;
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment