Unverified Commit 1ebba7f7 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #694 from zyearn/health_check_by_rpc_call

Health check by rpc call
parents 7965e9e5 5ce7363b
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*.rej *.rej
/output /output
/test/output /test/output
build/
# Ignore hidden files # Ignore hidden files
.* .*
......
...@@ -242,7 +242,7 @@ locality-aware,优先选择延时低的下游,直到其延时高于其他机 ...@@ -242,7 +242,7 @@ locality-aware,优先选择延时低的下游,直到其延时高于其他机
| ------------------------- | ----- | ---------------------------------------- | ----------------------- | | ------------------------- | ----- | ---------------------------------------- | ----------------------- |
| health_check_interval (R) | 3 | seconds between consecutive health-checkings | src/brpc/socket_map.cpp | | health_check_interval (R) | 3 | seconds between consecutive health-checkings | src/brpc/socket_map.cpp |
一旦server被连接上,它会恢复为可用状态。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。 在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,框架会发送一个HTTP GET请求到该server,请求路径通过-health\_check\_path设置(默认为空),只有当server返回200时,它才会恢复。在两种健康检查机制下,都可通过-health\_check\_timeout\_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。
# 发起访问 # 发起访问
......
...@@ -556,7 +556,7 @@ int Channel::Weight() { ...@@ -556,7 +556,7 @@ int Channel::Weight() {
int Channel::CheckHealth() { int Channel::CheckHealth() {
if (_lb == NULL) { if (_lb == NULL) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0) { if (Socket::Address(_server_id, &ptr) == 0 && ptr->IsAvailable()) {
return 0; return 0;
} }
return -1; return -1;
......
...@@ -986,7 +986,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -986,7 +986,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Don't use _current_call.peer_id which is set to -1 after construction // Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call. // of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock); const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff()) { if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64, SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id); endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP tmp_sock.reset(); // Release ref ASAP
......
...@@ -138,6 +138,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); ...@@ -138,6 +138,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16); static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17); static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18); static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
public: public:
Controller(); Controller();
...@@ -583,6 +584,10 @@ private: ...@@ -583,6 +584,10 @@ private:
CallId id = { _correlation_id.value + nretry + 1 }; CallId id = { _correlation_id.value + nretry + 1 };
return id; return id;
} }
// Tell RPC that this particular call is used to do health check.
bool is_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
public: public:
CallId current_id() const { CallId current_id() const {
CallId id = { _correlation_id.value + _current_call.nretry + 1 }; CallId id = { _correlation_id.value + _current_call.nretry + 1 };
......
...@@ -138,6 +138,11 @@ public: ...@@ -138,6 +138,11 @@ public:
return *this; return *this;
} }
ControllerPrivateAccessor& set_health_check_call() {
_cntl->add_flag(Controller::FLAGS_HEALTH_CHECK_CALL);
return *this;
}
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
// Jiashun Zhu(zhujiashun@baidu.com)
#include "brpc/details/health_check.h"
#include "brpc/socket.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/global.h"
#include "brpc/log.h"
#include "bthread/unstable.h"
#include "bthread/bthread.h"
namespace brpc {
// Declared at socket.cpp
extern SocketVarsCollector* g_vars;
DEFINE_string(health_check_path, "", "Http path of health check call."
"By default health check succeeds if the server is connectable."
"If this flag is set, health check is not completed until a http "
"call to the path succeeds within -health_check_timeout_ms(to make "
"sure the server functions well).");
DEFINE_int32(health_check_timeout_ms, 500, "The timeout for both establishing "
"the connection and the http call to -health_check_path over the connection");
class HealthCheckChannel : public brpc::Channel {
public:
HealthCheckChannel() {}
~HealthCheckChannel() {}
int Init(SocketId id, const ChannelOptions* options);
};
int HealthCheckChannel::Init(SocketId id, const ChannelOptions* options) {
brpc::GlobalInitializeOrDie();
if (InitChannelOptions(options) != 0) {
return -1;
}
_server_id = id;
return 0;
}
class OnAppHealthCheckDone : public google::protobuf::Closure {
public:
virtual void Run();
HealthCheckChannel channel;
brpc::Controller cntl;
SocketId id;
int64_t interval_s;
int64_t last_check_time_ms;
};
class HealthCheckManager {
public:
static void StartCheck(SocketId id, int64_t check_interval_s);
static void* AppCheck(void* arg);
};
void HealthCheckManager::StartCheck(SocketId id, int64_t check_interval_s) {
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
return;
}
LOG(INFO) << "Checking path=" << ptr->remote_side() << FLAGS_health_check_path;
OnAppHealthCheckDone* done = new OnAppHealthCheckDone;
done->id = id;
done->interval_s = check_interval_s;
brpc::ChannelOptions options;
options.protocol = PROTOCOL_HTTP;
options.max_retry = 0;
options.timeout_ms =
std::min((int64_t)FLAGS_health_check_timeout_ms, check_interval_s * 1000);
if (done->channel.Init(id, &options) != 0) {
LOG(WARNING) << "Fail to init health check channel to SocketId=" << id;
ptr->_ninflight_app_health_check.fetch_sub(
1, butil::memory_order_relaxed);
delete done;
return;
}
AppCheck(done);
}
void* HealthCheckManager::AppCheck(void* arg) {
OnAppHealthCheckDone* done = static_cast<OnAppHealthCheckDone*>(arg);
done->cntl.Reset();
done->cntl.http_request().uri() = FLAGS_health_check_path;
ControllerPrivateAccessor(&done->cntl).set_health_check_call();
done->last_check_time_ms = butil::gettimeofday_ms();
done->channel.CallMethod(NULL, &done->cntl, NULL, NULL, done);
return NULL;
}
void OnAppHealthCheckDone::Run() {
std::unique_ptr<OnAppHealthCheckDone> self_guard(this);
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
return;
}
if (!cntl.Failed() || ptr->Failed()) {
LOG_IF(INFO, !cntl.Failed()) << "Succeeded to call "
<< ptr->remote_side() << FLAGS_health_check_path;
// if ptr->Failed(), previous SetFailed would trigger next round
// of hc, just return here.
ptr->_ninflight_app_health_check.fetch_sub(
1, butil::memory_order_relaxed);
return;
}
RPC_VLOG << "Fail to check path=" << FLAGS_health_check_path
<< ", " << cntl.ErrorText();
int64_t sleep_time_ms =
last_check_time_ms + interval_s * 1000 - butil::gettimeofday_ms();
if (sleep_time_ms > 0) {
// TODO(zhujiashun): we need to handle the case when timer fails
// and bthread_usleep returns immediately. In most situations,
// the possibility of this case is quite small, so currently we
// just keep sending the hc call.
bthread_usleep(sleep_time_ms * 1000);
}
HealthCheckManager::AppCheck(self_guard.release());
}
class HealthCheckTask : public PeriodicTask {
public:
explicit HealthCheckTask(SocketId id);
bool OnTriggeringTask(timespec* next_abstime) override;
void OnDestroyingTask() override;
private:
SocketId _id;
bool _first_time;
};
HealthCheckTask::HealthCheckTask(SocketId id)
: _id(id)
, _first_time(true) {}
bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(_id, &ptr);
CHECK(rc != 0);
if (rc < 0) {
RPC_VLOG << "SocketId=" << _id
<< " was abandoned before health checking";
return false;
}
// Note: Making a Socket re-addessable is hard. An alternative is
// creating another Socket with selected internal fields to replace
// failed Socket. Although it avoids concurrent issues with in-place
// revive, it changes SocketId: many code need to watch SocketId
// and update on change, which is impractical. Another issue with
// this method is that it has to move "selected internal fields"
// which may be accessed in parallel, not trivial to be moved.
// Finally we choose a simple-enough solution: wait until the
// reference count hits `expected_nref', which basically means no
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp), one reference is here.
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
if (_first_time) { // Only check at first time.
_first_time = false;
if (ptr->WaitAndReset(2/*note*/) != 0) {
LOG(INFO) << "Cancel checking " << *ptr;
return false;
}
}
// g_vars must not be NULL because it is newed at the creation of
// first Socket. When g_vars is used, the socket is at health-checking
// state, which means the socket must be created and then g_vars can
// not be NULL.
g_vars->nhealthcheck << 1;
int hc = 0;
if (ptr->_user) {
hc = ptr->_user->CheckHealth(ptr.get());
} else {
hc = ptr->CheckHealth();
}
if (hc == 0) {
if (ptr->CreatedByConnect()) {
g_vars->channel_conn << -1;
}
if (!FLAGS_health_check_path.empty()) {
ptr->_ninflight_app_health_check.fetch_add(
1, butil::memory_order_relaxed);
}
ptr->Revive();
ptr->_hc_count = 0;
if (!FLAGS_health_check_path.empty()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
}
return false;
} else if (hc == ESTOP) {
LOG(INFO) << "Cancel checking " << *ptr;
return false;
}
++ ptr->_hc_count;
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
void HealthCheckTask::OnDestroyingTask() {
delete this;
}
void StartHealthCheck(SocketId id, int64_t delay_ms) {
PeriodicTaskManager::StartTaskAt(new HealthCheckTask(id),
butil::milliseconds_from_now(delay_ms));
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
// Jiashun Zhu(zhujiashun@baidu.com)
#ifndef _HEALTH_CHECK_H
#define _HEALTH_CHECK_H
#include "brpc/socket_id.h"
#include "brpc/periodic_task.h"
#include "bvar/bvar.h"
#include "brpc/socket.h"
namespace brpc {
// Start health check for socket id after delay_ms.
// If delay_ms <= 0, HealthCheck would be started
// immediately.
void StartHealthCheck(SocketId id, int64_t delay_ms);
} // namespace brpc
#endif
...@@ -310,7 +310,7 @@ int ConsistentHashingLoadBalancer::SelectServer( ...@@ -310,7 +310,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
if (((i + 1) == s->size() // always take last chance if (((i + 1) == s->size() // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id)) || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
&& Socket::Address(choice->server_sock.id, out->ptr) == 0 && Socket::Address(choice->server_sock.id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && (*out->ptr)->IsAvailable()) {
return 0; return 0;
} else { } else {
if (++choice == s->end()) { if (++choice == s->end()) {
......
...@@ -303,7 +303,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -303,7 +303,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
continue; continue;
} }
} else if (Socket::Address(info.server_id, out->ptr) == 0 } else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && (*out->ptr)->IsAvailable()) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again. // choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) { || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
......
...@@ -118,7 +118,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -118,7 +118,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (((i + 1) == n // always take last chance if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && (*out->ptr)->IsAvailable()) {
// We found an available server // We found an available server
return 0; return 0;
} }
......
...@@ -122,7 +122,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -122,7 +122,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (((i + 1) == n // always take last chance if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id)) || !ExcludedServers::IsExcluded(in.excluded, id))
&& Socket::Address(id, out->ptr) == 0 && Socket::Address(id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && (*out->ptr)->IsAvailable()) {
s.tls() = tls; s.tls() = tls;
return 0; return 0;
} }
......
...@@ -180,7 +180,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -180,7 +180,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
SocketId server_id = GetServerInNextStride(s->server_list, filter, tls_temp); SocketId server_id = GetServerInNextStride(s->server_list, filter, tls_temp);
if (!ExcludedServers::IsExcluded(in.excluded, server_id) if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0 && Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && (*out->ptr)->IsAvailable()) {
// update tls. // update tls.
tls.remain_server = tls_temp.remain_server; tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position; tls.position = tls_temp.position;
......
...@@ -56,7 +56,7 @@ public: ...@@ -56,7 +56,7 @@ public:
void AfterRevived(Socket* ptr) { void AfterRevived(Socket* ptr) {
LOG(INFO) << "Revived " << *chan << " chan=0x" << (void*)chan LOG(INFO) << "Revived " << *chan << " chan=0x" << (void*)chan
<< " Fake" << *ptr; << " Fake" << *ptr << " (Connectable)";
} }
}; };
......
This diff is collapsed.
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "brpc/options.pb.h" // ConnectionType #include "brpc/options.pb.h" // ConnectionType
#include "brpc/socket_id.h" // SocketId #include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr #include "brpc/socket_message.h" // SocketMessagePtr
#include "bvar/bvar.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -126,6 +127,28 @@ struct SocketStat { ...@@ -126,6 +127,28 @@ struct SocketStat {
uint32_t out_num_messages_m; uint32_t out_num_messages_m;
}; };
struct SocketVarsCollector {
SocketVarsCollector()
: nsocket("rpc_socket_count")
, channel_conn("rpc_channel_connection_count")
, neventthread_second("rpc_event_thread_second", &neventthread)
, nhealthcheck("rpc_health_check_count")
, nkeepwrite_second("rpc_keepwrite_second", &nkeepwrite)
, nwaitepollout("rpc_waitepollout_count")
, nwaitepollout_second("rpc_waitepollout_second", &nwaitepollout)
{}
bvar::Adder<int64_t> nsocket;
bvar::Adder<int64_t> channel_conn;
bvar::Adder<int> neventthread;
bvar::PerSecond<bvar::Adder<int> > neventthread_second;
bvar::Adder<int64_t> nhealthcheck;
bvar::Adder<int64_t> nkeepwrite;
bvar::PerSecond<bvar::Adder<int64_t> > nkeepwrite_second;
bvar::Adder<int64_t> nwaitepollout;
bvar::PerSecond<bvar::Adder<int64_t> > nwaitepollout_second;
};
struct PipelinedInfo { struct PipelinedInfo {
PipelinedInfo() { reset(); } PipelinedInfo() { reset(); }
void reset() { void reset() {
...@@ -186,6 +209,8 @@ friend class policy::ConsistentHashingLoadBalancer; ...@@ -186,6 +209,8 @@ friend class policy::ConsistentHashingLoadBalancer;
friend class policy::RtmpContext; friend class policy::RtmpContext;
friend class schan::ChannelBalancer; friend class schan::ChannelBalancer;
friend class HealthCheckTask; friend class HealthCheckTask;
friend class OnAppHealthCheckDone;
friend class HealthCheckManager;
friend class policy::H2GlobalStreamCreator; friend class policy::H2GlobalStreamCreator;
class SharedPart; class SharedPart;
struct Forbidden {}; struct Forbidden {};
...@@ -347,11 +372,13 @@ public: ...@@ -347,11 +372,13 @@ public:
// Set ELOGOFF flag to this `Socket' which means further requests // Set ELOGOFF flag to this `Socket' which means further requests
// through this `Socket' will receive an ELOGOFF error. This only // through this `Socket' will receive an ELOGOFF error. This only
// affects return value of `IsLogOff' and won't close the inner fd // affects return value of `IsAvailable' and won't close the inner
// Once set, this flag can only be cleared inside `WaitAndReset' // fd. Once set, this flag can only be cleared inside `WaitAndReset'.
void SetLogOff(); void SetLogOff();
bool IsLogOff() const;
// Check Whether the socket is available for user requests.
bool IsAvailable() const;
// Start to process edge-triggered events from the fd. // Start to process edge-triggered events from the fd.
// This function does not block caller. // This function does not block caller.
static int StartInputEvent(SocketId id, uint32_t events, static int StartInputEvent(SocketId id, uint32_t events,
...@@ -790,6 +817,8 @@ private: ...@@ -790,6 +817,8 @@ private:
butil::Mutex _stream_mutex; butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set; std::set<StreamId> *_stream_set;
butil::atomic<int64_t> _ninflight_app_health_check;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -241,8 +241,9 @@ inline void Socket::SetLogOff() { ...@@ -241,8 +241,9 @@ inline void Socket::SetLogOff() {
} }
} }
inline bool Socket::IsLogOff() const { inline bool Socket::IsAvailable() const {
return _logoff_flag.load(butil::memory_order_relaxed); return !_logoff_flag.load(butil::memory_order_relaxed) &&
(_ninflight_app_health_check.load(butil::memory_order_relaxed) == 0);
} }
static const uint32_t EOF_FLAG = (1 << 31); static const uint32_t EOF_FLAG = (1 << 31);
......
...@@ -14,7 +14,8 @@ set(TEST_PROTO_FILES addressbook1.proto ...@@ -14,7 +14,8 @@ set(TEST_PROTO_FILES addressbook1.proto
snappy_message.proto snappy_message.proto
v1.proto v1.proto
v2.proto v2.proto
grpc.proto) grpc.proto
health_check.proto)
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/test/hdrs) file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/test/hdrs)
set(PROTOC_FLAGS ${PROTOC_FLAGS} -I${CMAKE_SOURCE_DIR}/src) set(PROTOC_FLAGS ${PROTOC_FLAGS} -I${CMAKE_SOURCE_DIR}/src)
compile_proto(PROTO_HDRS PROTO_SRCS ${CMAKE_BINARY_DIR}/test compile_proto(PROTO_HDRS PROTO_SRCS ${CMAKE_BINARY_DIR}/test
......
...@@ -706,7 +706,7 @@ TEST_F(HttpTest, read_long_body_progressively) { ...@@ -706,7 +706,7 @@ TEST_F(HttpTest, read_long_body_progressively) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
// the socket still holds a ref. // the socket still holds a ref.
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
...@@ -794,7 +794,7 @@ TEST_F(HttpTest, read_progressively_after_cntl_destroys) { ...@@ -794,7 +794,7 @@ TEST_F(HttpTest, read_progressively_after_cntl_destroys) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
} }
// Wait for recycling of the main socket. // Wait for recycling of the main socket.
...@@ -843,7 +843,7 @@ TEST_F(HttpTest, read_progressively_after_long_delay) { ...@@ -843,7 +843,7 @@ TEST_F(HttpTest, read_progressively_after_long_delay) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
} }
...@@ -883,7 +883,7 @@ TEST_F(HttpTest, skip_progressive_reading) { ...@@ -883,7 +883,7 @@ TEST_F(HttpTest, skip_progressive_reading) {
ASSERT_EQ(0, svc.last_errno()); ASSERT_EQ(0, svc.last_errno());
LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes; LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes;
// The server side still wrote things. // The server side still wrote things.
ASSERT_GT(new_written_bytes - old_written_bytes, 100000); ASSERT_GT(new_written_bytes - old_written_bytes, (size_t)100000);
} }
class AlwaysFailRead : public brpc::ProgressiveReader { class AlwaysFailRead : public brpc::ProgressiveReader {
...@@ -954,7 +954,7 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) { ...@@ -954,7 +954,7 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
// the socket still holds a ref. // the socket still holds a ref.
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
......
...@@ -654,11 +654,11 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -654,11 +654,11 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
} }
std::cout << std::endl; std::cout << std::endl;
// Check whether slected result is consistent with expected. // Check whether slected result is consistent with expected.
EXPECT_EQ(3, select_result.size()); EXPECT_EQ((size_t)3, select_result.size());
for (const auto& result : select_result) { for (const auto& result : select_result) {
std::cout << result.first << " result=" << result.second std::cout << result.first << " result=" << result.second
<< " configured=" << configed_weight[result.first] << std::endl; << " configured=" << configed_weight[result.first] << std::endl;
EXPECT_EQ(result.second, configed_weight[result.first]); EXPECT_EQ(result.second, (size_t)configed_weight[result.first]);
} }
} }
...@@ -703,4 +703,83 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) { ...@@ -703,4 +703,83 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
brpc::ExcludedServers::Destroy(exclude); brpc::ExcludedServers::Destroy(exclude);
} }
TEST_F(LoadBalancerTest, health_check_no_valid_server) {
const char* servers[] = {
"10.92.115.19:8832",
"10.42.122.201:8833",
};
std::vector<brpc::LoadBalancer*> lbs;
lbs.push_back(new brpc::policy::RoundRobinLoadBalancer);
lbs.push_back(new brpc::policy::RandomizedLoadBalancer);
lbs.push_back(new brpc::policy::WeightedRoundRobinLoadBalancer);
for (int i = 0; i < (int)lbs.size(); ++i) {
brpc::LoadBalancer* lb = lbs[i];
std::vector<brpc::ServerId> ids;
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
brpc::ServerId id(8888);
brpc::SocketOptions options;
options.remote_side = dummy;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
id.tag = "50";
ids.push_back(id);
lb->AddServer(id);
}
// Without setting anything, the lb should work fine
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
}
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
// After putting server[0] into health check state, the only choice is servers[1]
ASSERT_EQ(ptr->remote_side().port, 8833);
}
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->_ninflight_app_health_check.store(1, butil::memory_order_relaxed);
for (int i = 0; i < 4; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
// There is no server available
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
}
ASSERT_EQ(0, brpc::Socket::Address(ids[0].id, &ptr));
ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
ASSERT_EQ(0, brpc::Socket::Address(ids[1].id, &ptr));
ptr->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// After reset health check state, the lb should work fine
bool get_server1 = false;
bool get_server2 = false;
for (int i = 0; i < 20; ++i) {
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
if (ptr->remote_side().port == 8832) {
get_server1 = true;
} else {
get_server2 = true;
}
}
ASSERT_TRUE(get_server1 && get_server2);
delete lb;
}
}
} //namespace } //namespace
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <fcntl.h> // F_GETFD #include <fcntl.h> // F_GETFD
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <gflags/gflags.h>
#include "butil/gperftools_profiler.h" #include "butil/gperftools_profiler.h"
#include "butil/time.h" #include "butil/time.h"
#include "butil/macros.h" #include "butil/macros.h"
...@@ -18,7 +19,12 @@ ...@@ -18,7 +19,12 @@
#include "brpc/acceptor.h" #include "brpc/acceptor.h"
#include "brpc/policy/hulu_pbrpc_protocol.h" #include "brpc/policy/hulu_pbrpc_protocol.h"
#include "brpc/policy/most_common_message.h" #include "brpc/policy/most_common_message.h"
#include "brpc/policy/http_rpc_protocol.h"
#include "brpc/nshead.h" #include "brpc/nshead.h"
#include "brpc/server.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "health_check.pb.h"
#if defined(OS_MACOSX) #if defined(OS_MACOSX)
#include <sys/event.h> #include <sys/event.h>
#endif #endif
...@@ -29,6 +35,10 @@ namespace bthread { ...@@ -29,6 +35,10 @@ namespace bthread {
extern TaskControl* g_task_control; extern TaskControl* g_task_control;
} }
namespace brpc {
DECLARE_int32(health_check_interval);
}
void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base); void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base);
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
...@@ -522,6 +532,89 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { ...@@ -522,6 +532,89 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
ASSERT_EQ(-1, brpc::Socket::Status(id)); ASSERT_EQ(-1, brpc::Socket::Status(id));
} }
class HealthCheckTestServiceImpl : public test::HealthCheckTestService {
public:
HealthCheckTestServiceImpl()
: _sleep_flag(true) {}
virtual ~HealthCheckTestServiceImpl() {}
virtual void default_method(google::protobuf::RpcController* cntl_base,
const test::HealthCheckRequest* request,
test::HealthCheckResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = (brpc::Controller*)cntl_base;
if (_sleep_flag) {
bthread_usleep(510000 /* 510ms, a little bit longer than the default
timeout of health check rpc */);
}
cntl->response_attachment().append("OK");
}
bool _sleep_flag;
};
TEST_F(SocketTest, app_level_health_check) {
int old_health_check_interval = brpc::FLAGS_health_check_interval;
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
butil::EndPoint point(butil::IP_ANY, 7777);
brpc::ChannelOptions options;
options.protocol = "http";
options.max_retry = 0;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init(point, &options));
{
brpc::Controller cntl;
cntl.http_request().uri() = "/";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
EXPECT_TRUE(cntl.Failed());
ASSERT_EQ(ECONNREFUSED, cntl.ErrorCode());
}
// 2s to make sure remote is connected by HealthCheckTask and enter the
// sending-rpc state. Because the remote is not down, so hc rpc would keep
// sending.
int listening_fd = tcp_listen(point);
bthread_usleep(2000000);
// 2s to make sure HealthCheckTask find socket is failed and correct impl
// should trigger next round of hc
close(listening_fd);
bthread_usleep(2000000);
brpc::Server server;
HealthCheckTestServiceImpl hc_service;
ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(point, NULL));
for (int i = 0; i < 4; ++i) {
// although ::connect would succeed, the stall in hc_service makes
// the health check rpc fail.
brpc::Controller cntl;
cntl.http_request().uri() = "/";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_EQ(EHOSTDOWN, cntl.ErrorCode());
bthread_usleep(1000000 /*1s*/);
}
hc_service._sleep_flag = false;
bthread_usleep(2000000 /* a little bit longer than hc rpc timeout + hc interval */);
// should recover now
{
brpc::Controller cntl;
cntl.http_request().uri() = "/";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_GT(cntl.response_attachment().size(), (size_t)0);
}
GFLAGS_NS::SetCommandLineOption("health_check_path", "");
char hc_buf[8];
snprintf(hc_buf, sizeof(hc_buf), "%d", old_health_check_interval);
GFLAGS_NS::SetCommandLineOption("health_check_interval", hc_buf);
}
TEST_F(SocketTest, health_check) { TEST_F(SocketTest, health_check) {
// FIXME(gejun): Messenger has to be new otherwise quitting may crash. // FIXME(gejun): Messenger has to be new otherwise quitting may crash.
brpc::Acceptor* messenger = new brpc::Acceptor; brpc::Acceptor* messenger = new brpc::Acceptor;
......
syntax="proto2";
option cc_generic_services = true;
package test;
message HealthCheckRequest {};
message HealthCheckResponse {};
service HealthCheckTestService {
rpc default_method(HealthCheckRequest) returns (HealthCheckResponse);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment