Commit d5bc479e authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: make hc rpc async

parent 400d8c61
......@@ -880,11 +880,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// Do health-checking even if we're not connected before, needed
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0 &&
// We don't want to start another health check task while
// the socket is in health checking using rpc state.
// Also see comment in HealthCheckTask::OnTriggeringTask
!_health_checking_using_rpc.load(butil::memory_order_relaxed)) {
if (_health_check_interval_s > 0) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
......@@ -1016,6 +1012,72 @@ int HealthCheckChannel::Init(SocketId id, const ChannelOptions* options) {
return 0;
}
class OnHealthCheckRPCDone : public google::protobuf::Closure {
public:
void Run() {
std::unique_ptr<OnHealthCheckRPCDone> self_guard(this);
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
return;
}
if (!cntl.Failed()) {
ptr->ResetHealthCheckingUsingRPC();
return;
}
// Socket::SetFailed() will trigger next round of hc, just
// return here.
if (cntl.Failed() && ptr->Failed()) {
return;
}
// the left case is cntl.Failed() && !ptr->Failed(),
// in which we should retry hc rpc.
RPC_VLOG << "Fail to health check using rpc, error="
<< cntl.ErrorText();
bthread_usleep(interval_s * 1000000);
cntl.Reset();
cntl.http_request().uri() = FLAGS_health_check_path;
cntl.set_health_check_call(true);
channel.CallMethod(NULL, &cntl, NULL, NULL, self_guard.release());
}
HealthCheckChannel channel;
brpc::Controller cntl;
SocketId id;
int64_t interval_s;
};
class HealthCheckManager {
public:
static void StartCheck(SocketId id, int64_t check_interval_s) {
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(id, &ptr);
if (rc < 0) {
RPC_VLOG << "SocketId=" << id
<< " was abandoned during health checking";
return;
}
OnHealthCheckRPCDone* done = new OnHealthCheckRPCDone;
done->id = id;
done->interval_s = check_interval_s;
brpc::ChannelOptions options;
options.protocol = "http";
options.max_retry = 0;
options.timeout_ms = FLAGS_health_check_timeout_ms;
if (done->channel.Init(id, &options) != 0) {
LOG(WARNING) << "Fail to init health check channel to SocketId=" << id;
ptr->ResetHealthCheckingUsingRPC();
return;
}
done->cntl.http_request().uri() = FLAGS_health_check_path;
done->cntl.set_health_check_call(true);
done->channel.CallMethod(NULL, &done->cntl, NULL, NULL, done);
}
};
bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(_id, &ptr);
......@@ -1064,47 +1126,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
ptr->Revive();
ptr->_hc_count = 0;
if (ptr->IsHealthCheckingUsingRPC()) {
brpc::ChannelOptions options;
options.protocol = "http";
options.max_retry = 0;
options.timeout_ms = FLAGS_health_check_timeout_ms;
HealthCheckChannel channel;
if (channel.Init(_id, &options) != 0) {
ptr->SetFailed();
++ ptr->_hc_count;
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
brpc::Controller cntl;
cntl.http_request().uri() = FLAGS_health_check_path;
cntl.set_health_check_call(true);
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
RPC_VLOG << "Fail to health check using rpc, error="
<< cntl.ErrorText();
// the hc rpc above may fail too, we should handle this case
// carefully. If this rpc fails, hc must be triggered again.
// One solution is to trigger the second hc in Socket::SetFailed
// in rpc code path, but rpc fails doesn't mean socket fails,
// so we should call Socket::SetFailed[1] explicitly here.
// But there is a race here:
// If the second hc succeed, the socket is revived and comes back
// to normal, after that, [1] is called here, making socket failed
// again, which is not an expected case.
//
// Another solution is to forbid hc while socket is in health check
// using rpc state. So there would be no second hc memtioned above,
// and this task should return true to trigger next round hc. There
// is no race in this solution.
//
// We take the second solution here, which is a clear and simple
// solution.
ptr->SetFailed(); // [1]
++ ptr->_hc_count;
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
ptr->ResetHealthCheckingUsingRPC();
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
}
return false;
} else if (hc == ESTOP) {
......
......@@ -186,6 +186,8 @@ friend class policy::ConsistentHashingLoadBalancer;
friend class policy::RtmpContext;
friend class schan::ChannelBalancer;
friend class HealthCheckTask;
friend class OnHealthCheckRPCDone;
friend class HealthCheckManager;
friend class policy::H2GlobalStreamCreator;
class SharedPart;
struct Forbidden {};
......
......@@ -560,11 +560,12 @@ TEST_F(SocketTest, health_check_using_rpc) {
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
GFLAGS_NS::SetCommandLineOption("health_check_interval", "1");
butil::EndPoint point(butil::IP_ANY, 7777);
brpc::ChannelOptions options;
options.protocol = "http";
options.max_retry = 0;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:7777", &options));
ASSERT_EQ(0, channel.Init(point, &options));
{
brpc::Controller cntl;
cntl.http_request().uri() = "/";
......@@ -572,11 +573,22 @@ TEST_F(SocketTest, health_check_using_rpc) {
EXPECT_TRUE(cntl.Failed());
ASSERT_EQ(ECONNREFUSED, cntl.ErrorCode());
}
// 2s to make sure remote is connected by HealthCheckTask and enter the
// sending-rpc state. Because the remote is not down, so hc rpc would keep
// sending.
int listening_fd = tcp_listen(point, false);
bthread_usleep(2000000);
// 2s to make sure HealthCheckTask find socket is failed and correct impl
// should trigger next round of hc
close(listening_fd);
bthread_usleep(2000000);
brpc::Server server;
HealthCheckTestServiceImpl hc_service;
ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start("127.0.0.1:7777", NULL));
ASSERT_EQ(0, server.Start(point, NULL));
for (int i = 0; i < 4; ++i) {
// although ::connect would succeed, the stall in hc_service makes
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment