Commit 74951b9d authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: 1.revise docs 2.combine IsLogOff and IsAppHealthCheck into IsAvailable

parent cc3de349
...@@ -242,7 +242,7 @@ locality-aware,优先选择延时低的下游,直到其延时高于其他机 ...@@ -242,7 +242,7 @@ locality-aware,优先选择延时低的下游,直到其延时高于其他机
| ------------------------- | ----- | ---------------------------------------- | ----------------------- | | ------------------------- | ----- | ---------------------------------------- | ----------------------- |
| health_check_interval (R) | 3 | seconds between consecutive health-checkings | src/brpc/socket_map.cpp | | health_check_interval (R) | 3 | seconds between consecutive health-checkings | src/brpc/socket_map.cpp |
在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,协议是Http,只有当Server返回200时,这个server才算恢复,可以通过把-health\_check\_path设置成被检查的路径来打开这个功能(如果下游也是brpc,推荐设置成/health,服务健康的话会返回200),-health\_check\_timeout\_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。 在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,框架会发送一个HTTP GET请求到该server,请求路径通过-health\_check\_path设置(默认为空),只有当server返回200时,它才会恢复。在两种健康检查机制下,都可通过-health\_check\_timeout\_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。
# 发起访问 # 发起访问
......
...@@ -556,9 +556,7 @@ int Channel::Weight() { ...@@ -556,9 +556,7 @@ int Channel::Weight() {
int Channel::CheckHealth() { int Channel::CheckHealth() {
if (_lb == NULL) { if (_lb == NULL) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0 && if (Socket::Address(_server_id, &ptr) == 0 && !ptr->IsAvailable()) {
!ptr->IsLogOff() &&
!ptr->IsAppHealthCheck()) {
return 0; return 0;
} }
return -1; return -1;
......
...@@ -986,8 +986,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -986,8 +986,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Don't use _current_call.peer_id which is set to -1 after construction // Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call. // of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock); const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff() || if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
(!is_health_check_call() && tmp_sock->IsAppHealthCheck())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64, SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id); endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP tmp_sock.reset(); // Release ref ASAP
......
...@@ -118,8 +118,6 @@ friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); ...@@ -118,8 +118,6 @@ friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessMongoRequest(InputMessageBase*);
friend void policy::ProcessThriftRequest(InputMessageBase*); friend void policy::ProcessThriftRequest(InputMessageBase*);
friend class OnAppHealthCheckDone;
friend class HealthCheckManager;
// << Flags >> // << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1; static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1); static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
...@@ -588,7 +586,6 @@ private: ...@@ -588,7 +586,6 @@ private:
} }
// Tell RPC that this particular call is used to do health check. // Tell RPC that this particular call is used to do health check.
void set_health_check_call(bool f) { set_flag(FLAGS_HEALTH_CHECK_CALL, f); }
bool is_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); } bool is_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
public: public:
......
...@@ -138,6 +138,11 @@ public: ...@@ -138,6 +138,11 @@ public:
return *this; return *this;
} }
ControllerPrivateAccessor& set_health_check_call() {
_cntl->add_flag(Controller::FLAGS_HEALTH_CHECK_CALL);
return *this;
}
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
...@@ -221,8 +221,7 @@ int ConsistentHashingLoadBalancer::SelectServer( ...@@ -221,8 +221,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
if (((i + 1) == s->size() // always take last chance if (((i + 1) == s->size() // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id)) || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0 && Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && (*out->ptr)->IsAvailable()) {
&& !(*out->ptr)->IsAppHealthCheck()) {
return 0; return 0;
} else { } else {
if (++choice == s->end()) { if (++choice == s->end()) {
......
...@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -123,7 +123,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
const SocketId id = s->server_list[i].id; const SocketId id = s->server_list[i].id;
if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id)) if ((!exclusion || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, &ptrs[nptr].first) == 0 && Socket::Address(id, &ptrs[nptr].first) == 0
&& !(*out->ptr)->IsAppHealthCheck()) { && (ptrs[nptr].first)->IsAvailable()) {
int w = schan::GetSubChannelWeight(ptrs[nptr].first->user()); int w = schan::GetSubChannelWeight(ptrs[nptr].first->user());
total_weight += w; total_weight += w;
if (nptr < 8) { if (nptr < 8) {
......
...@@ -303,8 +303,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -303,8 +303,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
continue; continue;
} }
} else if (Socket::Address(info.server_id, out->ptr) == 0 } else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && (*out->ptr)->IsAvailable()) {
&& !(*out->ptr)->IsAppHealthCheck()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again. // choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) { || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
...@@ -118,8 +118,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -118,8 +118,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (((i + 1) == n // always take last chance if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && (*out->ptr)->IsAvailable()) {
&& !(*out->ptr)->IsAppHealthCheck()) {
// We found an available server // We found an available server
return 0; return 0;
} }
......
...@@ -122,8 +122,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -122,8 +122,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (((i + 1) == n // always take last chance if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && (*out->ptr)->IsAvailable()) {
&& !(*out->ptr)->IsAppHealthCheck()) {
s.tls() = tls; s.tls() = tls;
return 0; return 0;
} }
......
...@@ -180,8 +180,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -180,8 +180,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
SocketId server_id = GetServerInNextStride(s->server_list, filter, tls_temp); SocketId server_id = GetServerInNextStride(s->server_list, filter, tls_temp);
if (!ExcludedServers::IsExcluded(in.excluded, server_id) if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0 && Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff() && !(*out->ptr)->IsAvailable()) {
&& !(*out->ptr)->IsAppHealthCheck()) {
// update tls. // update tls.
tls.remain_server = tls_temp.remain_server; tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position; tls.position = tls_temp.position;
......
...@@ -50,6 +50,7 @@ ...@@ -50,6 +50,7 @@
#include "brpc/periodic_task.h" #include "brpc/periodic_task.h"
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/controller.h" #include "brpc/controller.h"
#include "details/controller_private_accessor.h"
#include "brpc/global.h" #include "brpc/global.h"
#if defined(OS_MACOSX) #if defined(OS_MACOSX)
#include <sys/event.h> #include <sys/event.h>
...@@ -100,7 +101,9 @@ DEFINE_string(health_check_path, "", "Http path of health check call." ...@@ -100,7 +101,9 @@ DEFINE_string(health_check_path, "", "Http path of health check call."
"flag is set, health check is completed not only when server can be" "flag is set, health check is completed not only when server can be"
"connected but also an additional http call succeeds indicated by this" "connected but also an additional http call succeeds indicated by this"
"flag and FLAGS_health_check_timeout_ms"); "flag and FLAGS_health_check_timeout_ms");
DEFINE_int32(health_check_timeout_ms, 500, "Timeout of health check call"); DEFINE_int32(health_check_timeout_ms, 500, "Timeout of health check."
"If FLAGS_health_check_path is empty, it means timeout of connect."
"Otherwise it means timeout of app health check call.");
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) { static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
return v >= 2 && v < 1000/*large enough*/; return v >= 2 && v < 1000/*large enough*/;
...@@ -1030,7 +1033,7 @@ public: ...@@ -1030,7 +1033,7 @@ public:
bthread_usleep(interval_s * 1000000); bthread_usleep(interval_s * 1000000);
cntl.Reset(); cntl.Reset();
cntl.http_request().uri() = FLAGS_health_check_path; cntl.http_request().uri() = FLAGS_health_check_path;
cntl.set_health_check_call(true); ControllerPrivateAccessor(&cntl).set_health_check_call();
channel.CallMethod(NULL, &cntl, NULL, NULL, self_guard.release()); channel.CallMethod(NULL, &cntl, NULL, NULL, self_guard.release());
} }
...@@ -1066,7 +1069,7 @@ public: ...@@ -1066,7 +1069,7 @@ public:
return; return;
} }
done->cntl.http_request().uri() = FLAGS_health_check_path; done->cntl.http_request().uri() = FLAGS_health_check_path;
done->cntl.set_health_check_call(true); ControllerPrivateAccessor(&done->cntl).set_health_check_call();
done->channel.CallMethod(NULL, &done->cntl, NULL, NULL, done); done->channel.CallMethod(NULL, &done->cntl, NULL, NULL, done);
} }
}; };
...@@ -1122,7 +1125,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -1122,7 +1125,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
} }
ptr->Revive(); ptr->Revive();
ptr->_hc_count = 0; ptr->_hc_count = 0;
if (ptr->IsAppHealthCheck()) { if (!FLAGS_health_check_path.empty()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s); HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
} }
return false; return false;
...@@ -1313,7 +1316,6 @@ int Socket::Connect(const timespec* abstime, ...@@ -1313,7 +1316,6 @@ int Socket::Connect(const timespec* abstime,
// We need to do async connect (to manage the timeout by ourselves). // We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd)); CHECK_EQ(0, butil::make_non_blocking(sockfd));
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
bzero((char*)&serv_addr, sizeof(serv_addr)); bzero((char*)&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
...@@ -2391,11 +2393,9 @@ int Socket::CheckHealth() { ...@@ -2391,11 +2393,9 @@ int Socket::CheckHealth() {
if (_hc_count == 0) { if (_hc_count == 0) {
LOG(INFO) << "Checking " << *this; LOG(INFO) << "Checking " << *this;
} }
// Note: No timeout. Timeout setting is given to Write() which const timespec duetime =
// we don't know. A drawback is that if a connection takes long butil::milliseconds_from_now(FLAGS_health_check_timeout_ms);
// but finally succeeds(indicating unstable network?), we still const int connected_fd = Connect(&duetime, NULL, NULL);
// revive the socket.
const int connected_fd = Connect(NULL/*Note*/, NULL, NULL);
if (connected_fd >= 0) { if (connected_fd >= 0) {
::close(connected_fd); ::close(connected_fd);
return 0; return 0;
......
...@@ -346,15 +346,12 @@ public: ...@@ -346,15 +346,12 @@ public:
// Set ELOGOFF flag to this `Socket' which means further requests // Set ELOGOFF flag to this `Socket' which means further requests
// through this `Socket' will receive an ELOGOFF error. This only // through this `Socket' will receive an ELOGOFF error. This only
// affects return value of `IsLogOff' and won't close the inner fd // affects return value of `IsAvailable' and won't close the inner
// Once set, this flag can only be cleared inside `WaitAndReset' // fd. Once set, this flag can only be cleared inside `WaitAndReset'.
void SetLogOff(); void SetLogOff();
bool IsLogOff() const;
// Check Whether the state is in app level health checking state or // Check Whether the socket is available for user requests.
// not, which means this socket would not be selected in further bool IsAvailable() const;
// user request until app level check succeed.
bool IsAppHealthCheck() const;
// Start to process edge-triggered events from the fd. // Start to process edge-triggered events from the fd.
// This function does not block caller. // This function does not block caller.
......
...@@ -241,12 +241,9 @@ inline void Socket::SetLogOff() { ...@@ -241,12 +241,9 @@ inline void Socket::SetLogOff() {
} }
} }
inline bool Socket::IsLogOff() const { inline bool Socket::IsAvailable() const {
return _logoff_flag.load(butil::memory_order_relaxed); return !_logoff_flag.load(butil::memory_order_relaxed) &&
} (_ninflight_app_health_check.load(butil::memory_order_relaxed) == 0);
inline bool Socket::IsAppHealthCheck() const {
return (_ninflight_app_health_check.load(butil::memory_order_relaxed) != 0);
} }
static const uint32_t EOF_FLAG = (1 << 31); static const uint32_t EOF_FLAG = (1 << 31);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment