Commit 4acdf923 authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: replace app_level_health_check with app_health_check

parent ddcb0749
...@@ -558,7 +558,7 @@ int Channel::CheckHealth() { ...@@ -558,7 +558,7 @@ int Channel::CheckHealth() {
SocketUniquePtr ptr; SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0 && if (Socket::Address(_server_id, &ptr) == 0 &&
!ptr->IsLogOff() && !ptr->IsLogOff() &&
!ptr->IsAppLevelHealthCheck()) { !ptr->IsAppHealthCheck()) {
return 0; return 0;
} }
return -1; return -1;
......
...@@ -987,7 +987,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -987,7 +987,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// of the backup call. // of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock); const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff() || if (rc != 0 || tmp_sock->IsLogOff() ||
(!is_health_check_call() && tmp_sock->IsAppLevelHealthCheck())) { (!is_health_check_call() && tmp_sock->IsAppHealthCheck())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64, SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id); endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP tmp_sock.reset(); // Release ref ASAP
......
...@@ -222,7 +222,7 @@ int ConsistentHashingLoadBalancer::SelectServer( ...@@ -222,7 +222,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id)) || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0 && Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
return 0; return 0;
} else { } else {
if (++choice == s->end()) { if (++choice == s->end()) {
......
...@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[i].id; const SocketId id = s->server_list[i].id;
if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id)) if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, &ptrs[nptr].first) == 0 && Socket::Address(id, &ptrs[nptr].first) == 0
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
int w = schan::GetSubChannelWeight(ptrs[nptr].first->user()); int w = schan::GetSubChannelWeight(ptrs[nptr].first->user());
total_weight += w; total_weight += w;
if (nptr < 8) { if (nptr < 8) {
......
...@@ -304,7 +304,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -304,7 +304,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
} }
} else if (Socket::Address(info.server_id, out->ptr) == 0 } else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again. // choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) { || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
...@@ -119,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -119,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
// We found an available server // We found an available server
return 0; return 0;
} }
......
...@@ -123,7 +123,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -123,7 +123,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
s.tls() = tls; s.tls() = tls;
return 0; return 0;
} }
......
...@@ -181,7 +181,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -181,7 +181,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (!ExcludedServers::IsExcluded(in.excluded, server_id) if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0 && Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsLogOff()
&& !(*out->ptr)->IsAppLevelHealthCheck()) { && !(*out->ptr)->IsAppHealthCheck()) {
// update tls. // update tls.
tls.remain_server = tls_temp.remain_server; tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position; tls.position = tls_temp.position;
......
...@@ -483,7 +483,7 @@ Socket::Socket(Forbidden) ...@@ -483,7 +483,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL) , _epollout_butex(NULL)
, _write_head(NULL) , _write_head(NULL)
, _stream_set(NULL) , _stream_set(NULL)
, _ninflight_app_level_health_check(0) , _ninflight_app_health_check(0)
{ {
CreateVarsOnce(); CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL); pthread_mutex_init(&_id_wait_list_mutex, NULL);
...@@ -666,7 +666,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { ...@@ -666,7 +666,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0; m->_error_code = 0;
m->_error_text.clear(); m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed); m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_ninflight_app_level_health_check.store(0, butil::memory_order_relaxed); m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787 // NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512); const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) { if (rc) {
...@@ -1020,7 +1020,7 @@ public: ...@@ -1020,7 +1020,7 @@ public:
return; return;
} }
if (!cntl.Failed() || ptr->Failed()) { if (!cntl.Failed() || ptr->Failed()) {
ptr->_ninflight_app_level_health_check.fetch_sub( ptr->_ninflight_app_health_check.fetch_sub(
1, butil::memory_order_relaxed); 1, butil::memory_order_relaxed);
return; return;
} }
...@@ -1058,7 +1058,7 @@ public: ...@@ -1058,7 +1058,7 @@ public:
options.timeout_ms = FLAGS_health_check_timeout_ms; options.timeout_ms = FLAGS_health_check_timeout_ms;
if (done->channel.Init(id, &options) != 0) { if (done->channel.Init(id, &options) != 0) {
LOG(WARNING) << "Fail to init health check channel to SocketId=" << id; LOG(WARNING) << "Fail to init health check channel to SocketId=" << id;
ptr->_ninflight_app_level_health_check.fetch_sub( ptr->_ninflight_app_health_check.fetch_sub(
1, butil::memory_order_relaxed); 1, butil::memory_order_relaxed);
delete done; delete done;
return; return;
...@@ -1115,12 +1115,12 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -1115,12 +1115,12 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
s_vars->channel_conn << -1; s_vars->channel_conn << -1;
} }
if (!FLAGS_health_check_path.empty()) { if (!FLAGS_health_check_path.empty()) {
ptr->_ninflight_app_level_health_check.fetch_add( ptr->_ninflight_app_health_check.fetch_add(
1, butil::memory_order_relaxed); 1, butil::memory_order_relaxed);
} }
ptr->Revive(); ptr->Revive();
ptr->_hc_count = 0; ptr->_hc_count = 0;
if (ptr->IsAppLevelHealthCheck()) { if (ptr->IsAppHealthCheck()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s); HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
} }
return false; return false;
...@@ -2303,8 +2303,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ...@@ -2303,8 +2303,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nauth_context=" << ptr->_auth_context << "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed) << "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed) << "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed)
<< "\nninflight_app_level_health_check=" << "\nninflight_app_health_check="
<< ptr->_ninflight_app_level_health_check.load(butil::memory_order_relaxed) << ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id="; << "\nagent_socket_id=";
const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed); const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) { if (asid != INVALID_SOCKET_ID) {
......
...@@ -354,7 +354,7 @@ public: ...@@ -354,7 +354,7 @@ public:
// Check Whether the state is in app level health checking state or // Check Whether the state is in app level health checking state or
// not, which means this socket would not be selected in further // not, which means this socket would not be selected in further
// user request until app level check succeed. // user request until app level check succeed.
bool IsAppLevelHealthCheck() const; bool IsAppHealthCheck() const;
// Start to process edge-triggered events from the fd. // Start to process edge-triggered events from the fd.
// This function does not block caller. // This function does not block caller.
...@@ -798,7 +798,7 @@ private: ...@@ -798,7 +798,7 @@ private:
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set; std::set<StreamId> *_stream_set;
butil::atomic<int64_t> _ninflight_app_level_health_check; butil::atomic<int64_t> _ninflight_app_health_check;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -245,8 +245,8 @@ inline bool Socket::IsLogOff() const { ...@@ -245,8 +245,8 @@ inline bool Socket::IsLogOff() const {
return _logoff_flag.load(butil::memory_order_relaxed); return _logoff_flag.load(butil::memory_order_relaxed);
} }
inline bool Socket::IsAppLevelHealthCheck() const { inline bool Socket::IsAppHealthCheck() const {
return (_ninflight_app_level_health_check.load(butil::memory_order_relaxed) != 0); return (_ninflight_app_health_check.load(butil::memory_order_relaxed) != 0);
} }
static const uint32_t EOF_FLAG = (1 << 31); static const uint32_t EOF_FLAG = (1 << 31);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment