Unverified Commit 93fcc5b5 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #484 from zyearn/hc-async

asynchronous health checking
parents f912f0db d4022e4d
// Copyright (c) 2018 brpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#include <bthread/bthread.h>
#include <bthread/unstable.h>
#include "brpc/periodic_task.h"
namespace brpc {
PeriodicTask::~PeriodicTask() {
}
static void* PeriodicTaskThread(void* arg) {
PeriodicTask* task = static_cast<PeriodicTask*>(arg);
timespec abstime;
if (!task->DoPeriodicTask(&abstime)) { // end
task->DoPeriodicTask(NULL);
return NULL;
}
PeriodicTaskManager::StartTaskAt(task, abstime);
return NULL;
}
static void RunPeriodicTaskThread(void* arg) {
bthread_t th = 0;
int rc = bthread_start_background(
&th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);
if (rc != 0) {
LOG(ERROR) << "Fail to start PeriodicTaskThread";
static_cast<PeriodicTask*>(arg)->DoPeriodicTask(NULL);
return;
}
}
void PeriodicTaskManager::StartTaskAt(PeriodicTask* task, const timespec& abstime) {
if (task == NULL) {
LOG(ERROR) << "Param[task] is NULL";
return;
}
bthread_timer_t timer_id;
const int rc = bthread_timer_add(
&timer_id, abstime, RunPeriodicTaskThread, task);
if (rc != 0) {
LOG(ERROR) << "Fail to add timer for RunPerodicTaskThread";
task->DoPeriodicTask(NULL);
return;
}
}
} // namespace brpc
// Copyright (c) 2018 brpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_HEALTH_CHECK_MANAGER_H
#define BRPC_HEALTH_CHECK_MANAGER_H
namespace brpc {
// Override DoPeriodicTask() with code that needs to be periodically run. If
// the task is completed, the method should return false; Otherwise the method
// should return true and set `next_abstime' to the time that the task should
// be run next time.
// Each call to DoPeriodicTask() is run in a separated bthread which can be
// suspended. To preserve states between different calls, put the states as
// fields of (subclass of) PeriodicTask.
// If any error occurs or DoPeriodicTask() returns false, the task is called
// with DoPeriodicTask(NULL) and will not be scheduled anymore.
class PeriodicTask {
public:
virtual ~PeriodicTask();
virtual bool DoPeriodicTask(timespec* next_abstime) = 0;
};
class PeriodicTaskManager {
public:
static void StartTaskAt(PeriodicTask* task, const timespec& abstime);
};
} // namespace brpc
#endif // BRPC_HEALTH_CHECK_MANAGER_H
......@@ -41,6 +41,7 @@
#include "brpc/stream_impl.h"
#include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
......@@ -95,13 +96,6 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;
#ifdef BAIDU_INTERNAL
#define BRPC_AUXTHREAD_ATTR \
(sizeof(com_device_t) > 32*1024 ? BTHREAD_ATTR_NORMAL : BTHREAD_ATTR_SMALL)
#else
#define BRPC_AUXTHREAD_ATTR BTHREAD_ATTR_SMALL
#endif
class BAIDU_CACHELINE_ALIGNMENT SocketPool {
friend class Socket;
public:
......@@ -780,6 +774,16 @@ void Socket::Revive() {
}
}
class HealthCheckTask : public PeriodicTask {
public:
explicit HealthCheckTask(SocketId id) : _id(id) , _first_time(true) {}
bool DoPeriodicTask(timespec* next_abstime);
private:
SocketId _id;
bool _first_time;
};
int Socket::ReleaseAdditionalReference() {
bool expect = false;
// Use `relaxed' fence here since `Dereference' has `released' fence
......@@ -826,10 +830,9 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0) {
bthread_t th = 0;
int rc = bthread_start_background(
&th, &BRPC_AUXTHREAD_ATTR, HealthCheckThread, (void*)id());
CHECK_EQ(0, rc);
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
butil::milliseconds_from_now(_health_check_interval_s * 500));
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
......@@ -927,28 +930,19 @@ int Socket::Status(SocketId id, int32_t* nref) {
return -1;
}
void* Socket::HealthCheckThread(void* void_arg) {
SocketId socket_id = (SocketId)void_arg;
bool first_time = true;
if (bthread_usleep(100000) < 0) {
PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
return NULL;
bool HealthCheckTask::DoPeriodicTask(timespec* next_abstime) {
if (next_abstime == NULL) {
delete this;
return true;
}
for (;;) {
butil::EndPoint remote_side;
int check_interval_s = 0;
do {
SocketUniquePtr ptr;
const int rc = AddressFailedAsWell(socket_id, &ptr);
const int rc = Socket::AddressFailedAsWell(_id, &ptr);
CHECK(rc != 0);
if (rc < 0) {
RPC_VLOG << "SocketId=" << socket_id
RPC_VLOG << "SocketId=" << _id
<< " was abandoned before health checking";
return NULL;
return false;
}
remote_side = ptr->remote_side();
check_interval_s = ptr->_health_check_interval_s;
// Note: Making a Socket re-addessable is hard. An alternative is
// creating another Socket with selected internal fields to replace
// failed Socket. Although it avoids concurrent issues with in-place
......@@ -966,11 +960,11 @@ void* Socket::HealthCheckThread(void* void_arg) {
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
if (first_time) { // Only check at first time.
first_time = false;
if (_first_time) { // Only check at first time.
_first_time = false;
if (ptr->WaitAndReset(2/*note*/) != 0) {
LOG(INFO) << "Cancel checking " << *ptr;
return NULL;
return false;
}
}
......@@ -987,21 +981,14 @@ void* Socket::HealthCheckThread(void* void_arg) {
}
ptr->Revive();
ptr->_hc_count = 0;
return NULL;
return false;
} else if (hc == ESTOP) {
LOG(INFO) << "Cancel checking " << *ptr;
return NULL;
return false;
}
++ ptr->_hc_count;
} while (0);
CHECK_GT(check_interval_s, 0);
if (bthread_usleep(check_interval_s * 1000000L) < 0) {
PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
LOG(INFO) << "Cancel checking SocketId="
<< socket_id << '@' << remote_side;
return NULL;
}
}
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
void Socket::OnRecycle() {
......
......@@ -183,6 +183,7 @@ friend class Controller;
friend class policy::ConsistentHashingLoadBalancer;
friend class policy::RtmpContext;
friend class schan::ChannelBalancer;
friend class HealthCheckTask;
class SharedPart;
struct Forbidden {};
struct WriteRequest;
......@@ -523,7 +524,6 @@ friend void DereferenceSocket(Socket*);
int ConnectIfNot(const timespec* abstime, WriteRequest* req);
int ResetFileDescriptor(int fd);
static void* HealthCheckThread(void*);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
......@@ -668,7 +668,7 @@ private:
int _preferred_index;
// Number of HC since the last SetFailed() was called. Set to 0 when the
// socket is revived. Only set in HealthCheckThread
// socket is revived. Only set in HealthCheckTask::DoPeriodicTask()
int _hc_count;
// Size of current incomplete message, set to 0 on complete.
......
......@@ -36,7 +36,7 @@ namespace policy {
void SendRpcResponse(int64_t correlation_id, Controller* cntl,
const google::protobuf::Message* req,
const google::protobuf::Message* res,
const Server* server_raw, MethodStatus *, long);
const Server* server_raw, MethodStatus *, int64_t);
} // policy
} // brpc
......@@ -231,7 +231,7 @@ protected:
const google::protobuf::Message*,
const google::protobuf::Message*,
const brpc::Server*,
brpc::MethodStatus*, long>(
brpc::MethodStatus*, int64_t>(
&brpc::policy::SendRpcResponse,
meta.correlation_id(), cntl, NULL, res,
&ts->_dummy, NULL, -1);
......@@ -748,7 +748,7 @@ protected:
CallMethod(&subchans[0], &cntl, &req, &res, false);
ASSERT_TRUE(cntl.Failed());
ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_EQ("[E2001][127.0.1.1:0]Method ComboEcho() not implemented.", cntl.ErrorText());
ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method ComboEcho() not implemented."));
// do the rpc call.
cntl.Reset();
......
......@@ -394,7 +394,7 @@ TEST(NamingServiceTest, consul_with_backup_file) {
restful_map.c_str()));
ASSERT_EQ(0, server.Start("localhost:8500", NULL));
bthread_usleep(1000000);
bthread_usleep(2000000);
butil::EndPoint n1;
ASSERT_EQ(0, butil::str2endpoint("10.121.36.189:8003", &n1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment