Commit 2dc9cbad authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: add single server

parent 51259c0b
......@@ -15,6 +15,7 @@
*.rej
/output
/test/output
build/
# Ignore hidden files
.*
......
......@@ -281,6 +281,15 @@ int Channel::Init(butil::EndPoint server_addr_and_port,
return InitSingle(server_addr_and_port, "", options);
}
int Channel::Init(SocketId id, const ChannelOptions* options) {
GlobalInitializeOrDie();
if (InitChannelOptions(options) != 0) {
return -1;
}
_server_id = id;
return 0;
}
int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
const char* raw_server_address,
const ChannelOptions* options) {
......
......@@ -155,6 +155,7 @@ public:
int Init(butil::EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
int Init(SocketId id, const ChannelOptions* options);
// Connect this channel to a group of servers whose addresses can be
// accessed via `naming_service_url' according to its protocol. Use the
......
......@@ -982,15 +982,26 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.need_feedback = false;
_current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock;
bool health_check_call = has_flag(FLAGS_HEALTH_CHECK_CALL);
if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff()) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
if (!health_check_call) {
const int rc = Socket::Address(_single_server_id, &tmp_sock);
if (rc != 0 || tmp_sock->IsLogOff()) {
SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
}
} else {
const int rc = Socket::AddressFailedAsWell(_single_server_id, &tmp_sock);
if (rc < 0) {
SetFailed(EFAILEDSOCKET, "Socket to %s has been recycled, server_id=%" PRIu64,
endpoint2str(_remote_side).c_str(), _single_server_id);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
}
}
_current_call.peer_id = _single_server_id;
} else {
......
......@@ -138,6 +138,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
public:
Controller();
......@@ -324,6 +325,10 @@ public:
bool is_done_allowed_to_run_in_place() const
{ return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
// TODO(zhujiahsun): comment
void set_health_check_call(bool f) { set_flag(FLAGS_HEALTH_CHECK_CALL, f); }
bool has_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
// ------------------------------------------------------------------------
// Server-side methods.
// These calls shall be made from the server side only. Their results are
......
......@@ -48,6 +48,8 @@
#include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
......@@ -92,6 +94,10 @@ DEFINE_int32(connect_timeout_as_unreachable, 3,
"times *continuously*, the error is changed to ENETUNREACH which "
"fails the main socket as well when this socket is pooled.");
DEFINE_bool(health_check_using_rpc, false, "todo");
DEFINE_string(health_check_path, "/health", "todo");
DEFINE_int32(health_check_timeout_ms, 300, "todo");
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
return v >= 2 && v < 1000/*large enough*/;
}
......@@ -473,6 +479,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
//, _health_checking_using_rpc(false)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
......@@ -655,6 +662,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
//m->_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
......@@ -775,6 +783,7 @@ void Socket::Revive() {
}
// Set this flag to true since we add additional ref again
_recycle_flag.store(false, butil::memory_order_relaxed);
//_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
if (_user) {
_user->AfterRevived(this);
} else {
......@@ -865,6 +874,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0) {
//!_health_checking_using_rpc.load(butil::memory_order_relaxed)) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
......@@ -1024,6 +1034,32 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
if (ptr->CreatedByConnect()) {
s_vars->channel_conn << -1;
}
if (FLAGS_health_check_using_rpc) {
//ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
brpc::ChannelOptions options;
options.protocol = "http";
options.max_retry = 0;
options.timeout_ms = FLAGS_health_check_timeout_ms;
brpc::Channel channel;
if (channel.Init(_id, &options) != 0) {
++ ptr->_hc_count;
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
brpc::Controller cntl;
cntl.http_request().uri() = FLAGS_health_check_path;
cntl.set_health_check_call(true);
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(WARNING) << "Fail to health check using rpc, error="
<< cntl.ErrorText();
++ ptr->_hc_count;
*next_abstime = butil::seconds_from_now(ptr->_health_check_interval_s);
return true;
}
LOG(INFO) << "Succeed to health check using rpc";
}
ptr->Revive();
ptr->_hc_count = 0;
return false;
......
......@@ -348,6 +348,9 @@ public:
// Once set, this flag can only be cleared inside `WaitAndReset'
void SetLogOff();
bool IsLogOff() const;
// TODO(zhujiashun)
bool IsHealthCheckingUsingRPC() const;
// Start to process edge-triggered events from the fd.
// This function does not block caller.
......@@ -790,6 +793,11 @@ private:
butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
// If this flag is set, then the current socket is used to health check
// and should not health check again
butil::atomic<bool> _health_checking_using_rpc;
};
} // namespace brpc
......
......@@ -245,6 +245,10 @@ inline bool Socket::IsLogOff() const {
return _logoff_flag.load(butil::memory_order_relaxed);
}
inline bool Socket::IsHealthCheckingUsingRPC() const {
return _health_checking_using_rpc.load(butil::memory_order_relaxed);
}
static const uint32_t EOF_FLAG = (1 << 31);
inline void Socket::PostponeEOF() {
......
......@@ -14,7 +14,8 @@ set(TEST_PROTO_FILES addressbook1.proto
snappy_message.proto
v1.proto
v2.proto
grpc.proto)
grpc.proto
health_check.proto)
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/test/hdrs)
set(PROTOC_FLAGS ${PROTOC_FLAGS} -I${CMAKE_SOURCE_DIR}/src)
compile_proto(PROTO_HDRS PROTO_SRCS ${CMAKE_BINARY_DIR}/test
......
......@@ -18,7 +18,10 @@
#include "brpc/acceptor.h"
#include "brpc/policy/hulu_pbrpc_protocol.h"
#include "brpc/policy/most_common_message.h"
#include "brpc/policy/http_rpc_protocol.h"
#include "brpc/nshead.h"
#include "brpc/server.h"
#include "health_check.pb.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
......@@ -522,6 +525,111 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
ASSERT_EQ(-1, brpc::Socket::Status(id));
}
class HealthCheckTestServiceImpl : public test::HealthCheckTestService {
public:
HealthCheckTestServiceImpl()
: _sleep_flag(true) {}
virtual ~HealthCheckTestServiceImpl() {}
virtual void default_method(google::protobuf::RpcController* cntl_base,
const test::HealthCheckRequest* request,
test::HealthCheckResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = (brpc::Controller*)cntl_base;
LOG(INFO) << "In HealthCheckTestServiceImpl, flag=" << _sleep_flag;
if (_sleep_flag) {
bthread_usleep(310000 /* 310ms, a little bit longer than the default
timeout of health checking rpc */);
} else {
LOG(INFO) << "Return fast!";
}
cntl->response_attachment().append("OK");
}
bool _sleep_flag;
};
TEST_F(SocketTest, health_check_using_rpc) {
GFLAGS_NS::SetCommandLineOption("health_check_using_rpc", "true");
GFLAGS_NS::SetCommandLineOption("health_check_path", "/HealthCheckTestService");
brpc::SocketId id = 8888;
butil::EndPoint point(butil::IP_ANY, 7777);
const int kCheckInteval = 1;
brpc::SocketOptions options;
options.remote_side = point;
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
global_sock = s.get();
ASSERT_TRUE(global_sock);
const char* buf = "GET / HTTP/1.1\r\nHost: brpc.com\r\n\r\n";
const bool use_my_message = (butil::fast_rand_less_than(2) == 0);
brpc::SocketMessagePtr<MyMessage> msg;
int appended_msg = 0;
butil::IOBuf src;
if (use_my_message) {
LOG(INFO) << "Use MyMessage";
msg.reset(new MyMessage(buf, strlen(buf), &appended_msg));
} else {
src.append(buf, strlen(buf));
ASSERT_EQ(strlen(buf), src.length());
}
#ifdef CONNECT_IN_KEEPWRITE
bthread_id_t wait_id;
WaitData data;
ASSERT_EQ(0, bthread_id_create2(&wait_id, &data, OnWaitIdReset));
brpc::Socket::WriteOptions wopt;
wopt.id_wait = wait_id;
if (use_my_message) {
ASSERT_EQ(0, s->Write(msg, &wopt));
} else {
ASSERT_EQ(0, s->Write(&src, &wopt));
}
ASSERT_EQ(0, bthread_id_join(wait_id));
ASSERT_EQ(wait_id.value, data.id.value);
ASSERT_EQ(ECONNREFUSED, data.error_code);
ASSERT_TRUE(butil::StringPiece(data.error_text).starts_with(
"Fail to connect "));
if (use_my_message) {
ASSERT_TRUE(appended_msg);
}
#else
if (use_my_message) {
ASSERT_EQ(-1, s->Write(msg));
} else {
ASSERT_EQ(-1, s->Write(&src));
}
ASSERT_EQ(ECONNREFUSED, errno);
#endif
ASSERT_TRUE(src.empty());
ASSERT_EQ(-1, s->fd());
ASSERT_TRUE(global_sock);
brpc::SocketUniquePtr invalid_ptr;
ASSERT_EQ(-1, brpc::Socket::Address(id, &invalid_ptr));
brpc::Server server;
HealthCheckTestServiceImpl hc_service;
ASSERT_EQ(0, server.AddService(&hc_service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start("127.0.0.1:7777", NULL));
for (int i = 0; i < 3; ++i) {
// although ::connect would succeed, the stall in hc_service makes
// the health checking rpc fail.
ASSERT_EQ(1, brpc::Socket::Status(id));
bthread_usleep(1000000 /*1s*/);
}
hc_service._sleep_flag = false;
bthread_usleep(2000000);
// recover
ASSERT_EQ(0, brpc::Socket::Status(id));
GFLAGS_NS::SetCommandLineOption("health_check_using_rpc", "false");
}
TEST_F(SocketTest, health_check) {
// FIXME(gejun): Messenger has to be new otherwise quitting may crash.
brpc::Acceptor* messenger = new brpc::Acceptor;
......
syntax="proto2";
option cc_generic_services = true;
package test;
message HealthCheckRequest {};
message HealthCheckResponse {};
service HealthCheckTestService {
rpc default_method(HealthCheckRequest) returns (HealthCheckResponse);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment