Commit 7abe1fd6 authored by gejun's avatar gejun

Refine interfaces of PeriodicTask(not done yet)

parent 93fcc5b5
...@@ -20,15 +20,14 @@ ...@@ -20,15 +20,14 @@
namespace brpc { namespace brpc {
PeriodicTask::~PeriodicTask() { PeriodicTask::~PeriodicTask() {
} }
static void* PeriodicTaskThread(void* arg) { static void* PeriodicTaskThread(void* arg) {
PeriodicTask* task = static_cast<PeriodicTask*>(arg); PeriodicTask* task = static_cast<PeriodicTask*>(arg);
timespec abstime; timespec abstime;
if (!task->DoPeriodicTask(&abstime)) { // end if (!task->OnTriggeringTask(&abstime)) { // end
task->DoPeriodicTask(NULL); task->OnDestroyingTask();
return NULL; return NULL;
} }
PeriodicTaskManager::StartTaskAt(task, abstime); PeriodicTaskManager::StartTaskAt(task, abstime);
...@@ -41,7 +40,7 @@ static void RunPeriodicTaskThread(void* arg) { ...@@ -41,7 +40,7 @@ static void RunPeriodicTaskThread(void* arg) {
&th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg); &th, &BTHREAD_ATTR_NORMAL, PeriodicTaskThread, arg);
if (rc != 0) { if (rc != 0) {
LOG(ERROR) << "Fail to start PeriodicTaskThread"; LOG(ERROR) << "Fail to start PeriodicTaskThread";
static_cast<PeriodicTask*>(arg)->DoPeriodicTask(NULL); static_cast<PeriodicTask*>(arg)->OnDestroyingTask();
return; return;
} }
} }
...@@ -56,7 +55,7 @@ void PeriodicTaskManager::StartTaskAt(PeriodicTask* task, const timespec& abstim ...@@ -56,7 +55,7 @@ void PeriodicTaskManager::StartTaskAt(PeriodicTask* task, const timespec& abstim
&timer_id, abstime, RunPeriodicTaskThread, task); &timer_id, abstime, RunPeriodicTaskThread, task);
if (rc != 0) { if (rc != 0) {
LOG(ERROR) << "Fail to add timer for RunPerodicTaskThread"; LOG(ERROR) << "Fail to add timer for RunPerodicTaskThread";
task->DoPeriodicTask(NULL); task->OnDestroyingTask();
return; return;
} }
} }
......
...@@ -19,20 +19,20 @@ ...@@ -19,20 +19,20 @@
namespace brpc { namespace brpc {
// Override OnTriggeringTask() with code that needs to be periodically run. If
// Override DoPeriodicTask() with code that needs to be periodically run. If
// the task is completed, the method should return false; Otherwise the method // the task is completed, the method should return false; Otherwise the method
// should return true and set `next_abstime' to the time that the task should // should return true and set `next_abstime' to the time that the task should
// be run next time. // be run next time.
// Each call to DoPeriodicTask() is run in a separated bthread which can be // Each call to OnTriggeringTask() is run in a separated bthread which can be
// suspended. To preserve states between different calls, put the states as // suspended. To preserve states between different calls, put the states as
// fields of (subclass of) PeriodicTask. // fields of (subclass of) PeriodicTask.
// If any error occurs or DoPeriodicTask() returns false, the task is called // If any error occurs or OnTriggeringTask() returns false, the task is called
// with DoPeriodicTask(NULL) and will not be scheduled anymore. // with OnDestroyingTask() and will not be scheduled anymore.
class PeriodicTask { class PeriodicTask {
public: public:
virtual ~PeriodicTask(); virtual ~PeriodicTask();
virtual bool DoPeriodicTask(timespec* next_abstime) = 0; virtual bool OnTriggeringTask(timespec* next_abstime) = 0;
virtual void OnDestroyingTask() = 0;
}; };
class PeriodicTaskManager { class PeriodicTaskManager {
......
...@@ -777,7 +777,8 @@ void Socket::Revive() { ...@@ -777,7 +777,8 @@ void Socket::Revive() {
class HealthCheckTask : public PeriodicTask { class HealthCheckTask : public PeriodicTask {
public: public:
explicit HealthCheckTask(SocketId id) : _id(id) , _first_time(true) {} explicit HealthCheckTask(SocketId id) : _id(id) , _first_time(true) {}
bool DoPeriodicTask(timespec* next_abstime); bool OnTriggeringTask(timespec* next_abstime) override;
void OnDestroyingTask() override;
private: private:
SocketId _id; SocketId _id;
...@@ -829,10 +830,13 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { ...@@ -829,10 +830,13 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// Do health-checking even if we're not connected before, needed // Do health-checking even if we're not connected before, needed
// by Channel to revive never-connected socket when server side // by Channel to revive never-connected socket when server side
// comes online. // comes online.
// FIXME(gejun): the initial delay should be related to uncommited
// CircuitBreaker and shorter for occasional errors and longer for
// frequent errors.
if (_health_check_interval_s > 0) { if (_health_check_interval_s > 0) {
PeriodicTaskManager::StartTaskAt( PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()), new HealthCheckTask(id()),
butil::milliseconds_from_now(_health_check_interval_s * 500)); butil::milliseconds_from_now(0)/*FIXME*/);
} }
// Wake up all threads waiting on EPOLLOUT when closing fd // Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed); _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
...@@ -930,11 +934,11 @@ int Socket::Status(SocketId id, int32_t* nref) { ...@@ -930,11 +934,11 @@ int Socket::Status(SocketId id, int32_t* nref) {
return -1; return -1;
} }
bool HealthCheckTask::DoPeriodicTask(timespec* next_abstime) { void HealthCheckTask::OnDestroyingTask() {
if (next_abstime == NULL) { delete this;
delete this; }
return true;
} bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
const int rc = Socket::AddressFailedAsWell(_id, &ptr); const int rc = Socket::AddressFailedAsWell(_id, &ptr);
CHECK(rc != 0); CHECK(rc != 0);
......
...@@ -668,7 +668,7 @@ private: ...@@ -668,7 +668,7 @@ private:
int _preferred_index; int _preferred_index;
// Number of HC since the last SetFailed() was called. Set to 0 when the // Number of HC since the last SetFailed() was called. Set to 0 when the
// socket is revived. Only set in HealthCheckTask::DoPeriodicTask() // socket is revived. Only set in HealthCheckTask::OnTriggeringTask()
int _hc_count; int _hc_count;
// Size of current incomplete message, set to 0 on complete. // Size of current incomplete message, set to 0 on complete.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment