Commit 6cb01912 authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: wrap HealthCheckTask

parent 5a282109
...@@ -152,10 +152,22 @@ void OnAppHealthCheckDone::Run() { ...@@ -152,10 +152,22 @@ void OnAppHealthCheckDone::Run() {
self_guard.release(); self_guard.release();
} }
HealthCheckTask::HealthCheckTask(SocketId id, bvar::Adder<int64_t>* nhealthcheck) class HealthCheckTask : public PeriodicTask {
public:
explicit HealthCheckTask(SocketId id, SocketVarsCollector* nhealthcheck);
bool OnTriggeringTask(timespec* next_abstime) override;
void OnDestroyingTask() override;
private:
SocketId _id;
bool _first_time;
SocketVarsCollector* _collector;
};
HealthCheckTask::HealthCheckTask(SocketId id, SocketVarsCollector* collector)
: _id(id) : _id(id)
, _first_time(true) , _first_time(true)
, _nhealthcheck(nhealthcheck) {} , _collector(collector) {}
bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
...@@ -191,7 +203,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -191,7 +203,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
} }
} }
(*_nhealthcheck) << 1; _collector->nhealthcheck << 1;
int hc = 0; int hc = 0;
if (ptr->_user) { if (ptr->_user) {
hc = ptr->_user->CheckHealth(ptr.get()); hc = ptr->_user->CheckHealth(ptr.get());
...@@ -200,7 +212,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) { ...@@ -200,7 +212,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
} }
if (hc == 0) { if (hc == 0) {
if (ptr->CreatedByConnect()) { if (ptr->CreatedByConnect()) {
(*_nhealthcheck) << -1; _collector->channel_conn << -1;
} }
if (!FLAGS_health_check_path.empty()) { if (!FLAGS_health_check_path.empty()) {
ptr->_ninflight_app_health_check.fetch_add( ptr->_ninflight_app_health_check.fetch_add(
...@@ -225,4 +237,8 @@ void HealthCheckTask::OnDestroyingTask() { ...@@ -225,4 +237,8 @@ void HealthCheckTask::OnDestroyingTask() {
delete this; delete this;
} }
PeriodicTask* NewHealthCheckTask(SocketId id, SocketVarsCollector* collector) {
return new HealthCheckTask(id, collector);
}
} // namespace brpc } // namespace brpc
...@@ -21,20 +21,11 @@ ...@@ -21,20 +21,11 @@
#include "brpc/socket_id.h" #include "brpc/socket_id.h"
#include "brpc/periodic_task.h" #include "brpc/periodic_task.h"
#include "bvar/bvar.h" #include "bvar/bvar.h"
#include "brpc/socket.h"
namespace brpc { namespace brpc {
class HealthCheckTask : public PeriodicTask { PeriodicTask* NewHealthCheckTask(SocketId id, SocketVarsCollector* collector);
public:
explicit HealthCheckTask(SocketId id, bvar::Adder<int64_t>* nhealthcheck);
bool OnTriggeringTask(timespec* next_abstime) override;
void OnDestroyingTask() override;
private:
SocketId _id;
bool _first_time;
bvar::Adder<int64_t>* _nhealthcheck;
};
} // namespace brpc } // namespace brpc
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include "butil/logging.h" // CHECK #include "butil/logging.h" // CHECK
#include "butil/macros.h" #include "butil/macros.h"
#include "butil/class_name.h" // butil::class_name #include "butil/class_name.h" // butil::class_name
#include "bvar/bvar.h"
#include "brpc/log.h" #include "brpc/log.h"
#include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG #include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG
#include "brpc/errno.pb.h" #include "brpc/errno.pb.h"
...@@ -272,33 +271,11 @@ void Socket::SharedPart::UpdateStatsEverySecond(int64_t now_ms) { ...@@ -272,33 +271,11 @@ void Socket::SharedPart::UpdateStatsEverySecond(int64_t now_ms) {
} }
} }
struct SocketVarsCollector { SocketVarsCollector* g_vars = NULL;
SocketVarsCollector()
: nsocket("rpc_socket_count")
, channel_conn("rpc_channel_connection_count")
, neventthread_second("rpc_event_thread_second", &neventthread)
, nhealthcheck("rpc_health_check_count")
, nkeepwrite_second("rpc_keepwrite_second", &nkeepwrite)
, nwaitepollout("rpc_waitepollout_count")
, nwaitepollout_second("rpc_waitepollout_second", &nwaitepollout)
{}
bvar::Adder<int64_t> nsocket;
bvar::Adder<int64_t> channel_conn;
bvar::Adder<int> neventthread;
bvar::PerSecond<bvar::Adder<int> > neventthread_second;
bvar::Adder<int64_t> nhealthcheck;
bvar::Adder<int64_t> nkeepwrite;
bvar::PerSecond<bvar::Adder<int64_t> > nkeepwrite_second;
bvar::Adder<int64_t> nwaitepollout;
bvar::PerSecond<bvar::Adder<int64_t> > nwaitepollout_second;
};
static SocketVarsCollector* s_vars = NULL;
static pthread_once_t s_create_vars_once = PTHREAD_ONCE_INIT; static pthread_once_t s_create_vars_once = PTHREAD_ONCE_INIT;
static void CreateVars() { static void CreateVars() {
s_vars = new SocketVarsCollector; g_vars = new SocketVarsCollector;
} }
void Socket::CreateVarsOnce() { void Socket::CreateVarsOnce() {
...@@ -307,8 +284,8 @@ void Socket::CreateVarsOnce() { ...@@ -307,8 +284,8 @@ void Socket::CreateVarsOnce() {
// Used by ConnectionService // Used by ConnectionService
int64_t GetChannelConnectionCount() { int64_t GetChannelConnectionCount() {
if (s_vars) { if (g_vars) {
return s_vars->channel_conn.get_value(); return g_vars->channel_conn.get_value();
} }
return 0; return 0;
} }
...@@ -612,7 +589,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) { ...@@ -612,7 +589,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
LOG(FATAL) << "Fail to get_resource<Socket>"; LOG(FATAL) << "Fail to get_resource<Socket>";
return -1; return -1;
} }
s_vars->nsocket << 1; g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed)); CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
m->_nevent.store(0, butil::memory_order_relaxed); m->_nevent.store(0, butil::memory_order_relaxed);
m->_keytable_pool = options.keytable_pool; m->_keytable_pool = options.keytable_pool;
...@@ -717,7 +694,7 @@ int Socket::WaitAndReset(int32_t expected_nref) { ...@@ -717,7 +694,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
} }
close(prev_fd); close(prev_fd);
if (CreatedByConnect()) { if (CreatedByConnect()) {
s_vars->channel_conn << -1; g_vars->channel_conn << -1;
} }
} }
_local_side = butil::EndPoint(); _local_side = butil::EndPoint();
...@@ -861,7 +838,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { ...@@ -861,7 +838,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
if (_health_check_interval_s > 0) { if (_health_check_interval_s > 0) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt( PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id(), &s_vars->nhealthcheck), NewHealthCheckTask(id(), g_vars),
butil::milliseconds_from_now(GetOrNewSharedPart()-> butil::milliseconds_from_now(GetOrNewSharedPart()->
circuit_breaker.isolation_duration_ms())); circuit_breaker.isolation_duration_ms()));
} }
...@@ -997,7 +974,7 @@ void Socket::OnRecycle() { ...@@ -997,7 +974,7 @@ void Socket::OnRecycle() {
} }
close(prev_fd); close(prev_fd);
if (create_by_connect) { if (create_by_connect) {
s_vars->channel_conn << -1; g_vars->channel_conn << -1;
} }
} }
reset_parsing_context(NULL); reset_parsing_context(NULL);
...@@ -1032,7 +1009,7 @@ void Socket::OnRecycle() { ...@@ -1032,7 +1009,7 @@ void Socket::OnRecycle() {
} }
} }
s_vars->nsocket << -1; g_vars->nsocket << -1;
} }
void* Socket::ProcessEvent(void* arg) { void* Socket::ProcessEvent(void* arg) {
...@@ -1248,7 +1225,7 @@ int Socket::CheckConnected(int sockfd) { ...@@ -1248,7 +1225,7 @@ int Socket::CheckConnected(int sockfd) {
<< " via fd=" << (int)sockfd << " SocketId=" << id() << " via fd=" << (int)sockfd << " SocketId=" << id()
<< " local_port=" << ntohs(client.sin_port); << " local_port=" << ntohs(client.sin_port);
if (CreatedByConnect()) { if (CreatedByConnect()) {
s_vars->channel_conn << 1; g_vars->channel_conn << 1;
} }
// Doing SSL handshake after TCP connected // Doing SSL handshake after TCP connected
return SSLHandshake(sockfd, false); return SSLHandshake(sockfd, false);
...@@ -1617,7 +1594,7 @@ FAIL_TO_WRITE: ...@@ -1617,7 +1594,7 @@ FAIL_TO_WRITE:
static const size_t DATA_LIST_MAX = 256; static const size_t DATA_LIST_MAX = 256;
void* Socket::KeepWrite(void* void_arg) { void* Socket::KeepWrite(void* void_arg) {
s_vars->nkeepwrite << 1; g_vars->nkeepwrite << 1;
WriteRequest* req = static_cast<WriteRequest*>(void_arg); WriteRequest* req = static_cast<WriteRequest*>(void_arg);
SocketUniquePtr s(req->socket); SocketUniquePtr s(req->socket);
...@@ -1657,7 +1634,7 @@ void* Socket::KeepWrite(void* void_arg) { ...@@ -1657,7 +1634,7 @@ void* Socket::KeepWrite(void* void_arg) {
// Update(8/15/2017): Not working, performance downgraded. // Update(8/15/2017): Not working, performance downgraded.
//if (nw <= 0 || req->data.empty()/*note*/) { //if (nw <= 0 || req->data.empty()/*note*/) {
if (nw <= 0) { if (nw <= 0) {
s_vars->nwaitepollout << 1; g_vars->nwaitepollout << 1;
bool pollin = (s->_on_edge_triggered_events != NULL); bool pollin = (s->_on_edge_triggered_events != NULL);
// NOTE: Waiting epollout within timeout is a must to force // NOTE: Waiting epollout within timeout is a must to force
// KeepWrite to check and setup pending WriteRequests periodically, // KeepWrite to check and setup pending WriteRequests periodically,
...@@ -1972,7 +1949,7 @@ int Socket::StartInputEvent(SocketId id, uint32_t events, ...@@ -1972,7 +1949,7 @@ int Socket::StartInputEvent(SocketId id, uint32_t events,
// According to the stats, above fetch_add is very effective. In a // According to the stats, above fetch_add is very effective. In a
// server processing 1 million requests per second, this counter // server processing 1 million requests per second, this counter
// is just 1500~1700/s // is just 1500~1700/s
s_vars->neventthread << 1; g_vars->neventthread << 1;
bthread_t tid; bthread_t tid;
// transfer ownership as well, don't use s anymore! // transfer ownership as well, don't use s anymore!
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "brpc/options.pb.h" // ConnectionType #include "brpc/options.pb.h" // ConnectionType
#include "brpc/socket_id.h" // SocketId #include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr #include "brpc/socket_message.h" // SocketMessagePtr
#include "bvar/bvar.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -126,6 +127,28 @@ struct SocketStat { ...@@ -126,6 +127,28 @@ struct SocketStat {
uint32_t out_num_messages_m; uint32_t out_num_messages_m;
}; };
struct SocketVarsCollector {
SocketVarsCollector()
: nsocket("rpc_socket_count")
, channel_conn("rpc_channel_connection_count")
, neventthread_second("rpc_event_thread_second", &neventthread)
, nhealthcheck("rpc_health_check_count")
, nkeepwrite_second("rpc_keepwrite_second", &nkeepwrite)
, nwaitepollout("rpc_waitepollout_count")
, nwaitepollout_second("rpc_waitepollout_second", &nwaitepollout)
{}
bvar::Adder<int64_t> nsocket;
bvar::Adder<int64_t> channel_conn;
bvar::Adder<int> neventthread;
bvar::PerSecond<bvar::Adder<int> > neventthread_second;
bvar::Adder<int64_t> nhealthcheck;
bvar::Adder<int64_t> nkeepwrite;
bvar::PerSecond<bvar::Adder<int64_t> > nkeepwrite_second;
bvar::Adder<int64_t> nwaitepollout;
bvar::PerSecond<bvar::Adder<int64_t> > nwaitepollout_second;
};
struct PipelinedInfo { struct PipelinedInfo {
PipelinedInfo() { reset(); } PipelinedInfo() { reset(); }
void reset() { void reset() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment