Commit 413db42c authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: wrap to StartHealthCheckWithDelayMS

parent 6cb01912
...@@ -27,6 +27,9 @@ ...@@ -27,6 +27,9 @@
namespace brpc { namespace brpc {
// declared at socket.cpp
extern SocketVarsCollector* g_vars;
DEFINE_string(health_check_path, "", "Http path of health check call." DEFINE_string(health_check_path, "", "Http path of health check call."
"By default health check succeeds if the server is connectable." "By default health check succeeds if the server is connectable."
"If this flag is set, health check is not completed until a http " "If this flag is set, health check is not completed until a http "
...@@ -154,20 +157,18 @@ void OnAppHealthCheckDone::Run() { ...@@ -154,20 +157,18 @@ void OnAppHealthCheckDone::Run() {
class HealthCheckTask : public PeriodicTask { class HealthCheckTask : public PeriodicTask {
public: public:
explicit HealthCheckTask(SocketId id, SocketVarsCollector* nhealthcheck); explicit HealthCheckTask(SocketId id);
bool OnTriggeringTask(timespec* next_abstime) override; bool OnTriggeringTask(timespec* next_abstime) override;
void OnDestroyingTask() override; void OnDestroyingTask() override;
private: private:
SocketId _id; SocketId _id;
bool _first_time; bool _first_time;
SocketVarsCollector* _collector;
}; };
HealthCheckTask::HealthCheckTask(SocketId id, SocketVarsCollector* collector) HealthCheckTask::HealthCheckTask(SocketId id)
: _id(id) : _id(id)
, _first_time(true) , _first_time(true) {}
, _collector(collector) {}
bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
...@@ -203,7 +204,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -203,7 +204,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
} }
} }
_collector->nhealthcheck << 1; g_vars->nhealthcheck << 1;
int hc = 0; int hc = 0;
if (ptr->_user) { if (ptr->_user) {
hc = ptr->_user->CheckHealth(ptr.get()); hc = ptr->_user->CheckHealth(ptr.get());
...@@ -212,7 +213,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -212,7 +213,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
} }
if (hc == 0) { if (hc == 0) {
if (ptr->CreatedByConnect()) { if (ptr->CreatedByConnect()) {
_collector->channel_conn << -1; g_vars->channel_conn << -1;
} }
if (!FLAGS_health_check_path.empty()) { if (!FLAGS_health_check_path.empty()) {
ptr->_ninflight_app_health_check.fetch_add( ptr->_ninflight_app_health_check.fetch_add(
...@@ -237,8 +238,9 @@ void HealthCheckTask::OnDestroyingTask() { ...@@ -237,8 +238,9 @@ void HealthCheckTask::OnDestroyingTask() {
delete this; delete this;
} }
PeriodicTask* NewHealthCheckTask(SocketId id, SocketVarsCollector* collector) { void StartHealthCheckWithDelayMS(SocketId id, int64_t delay_ms) {
return new HealthCheckTask(id, collector); PeriodicTaskManager::StartTaskAt(new HealthCheckTask(id),
butil::milliseconds_from_now(delay_ms));
} }
} // namespace brpc } // namespace brpc
...@@ -25,7 +25,8 @@ ...@@ -25,7 +25,8 @@
namespace brpc { namespace brpc {
PeriodicTask* NewHealthCheckTask(SocketId id, SocketVarsCollector* collector); // Start health check for socket id after delay_ms.
void StartHealthCheckWithDelayMS(SocketId id, int64_t delay_ms);
} // namespace brpc } // namespace brpc
......
...@@ -837,10 +837,8 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { ...@@ -837,10 +837,8 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// comes online. // comes online.
if (_health_check_interval_s > 0) { if (_health_check_interval_s > 0) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt( StartHealthCheckWithDelayMS(id(),
NewHealthCheckTask(id(), g_vars), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
butil::milliseconds_from_now(GetOrNewSharedPart()->
circuit_breaker.isolation_duration_ms()));
} }
// Wake up all threads waiting on EPOLLOUT when closing fd // Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed); _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment