Commit e9fe8b1e authored by zhujiashun's avatar zhujiashun

add discovery client

parent dde83f0c
......@@ -17,6 +17,8 @@
#include <gflags/gflags.h>
#include "butil/third_party/rapidjson/document.h"
#include "butil/string_printf.h"
#include "butil/fast_rand.h"
#include "bthread/bthread.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/policy/discovery_naming_service.h"
......@@ -33,14 +35,22 @@ DEFINE_string(discovery_api_addr, "", "The address of discovery api");
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "Environment of services");
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
DEFINE_int32(discovery_renew_interval_s, 30, "The interval between two consecutive renews");
DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyond"
" which Register would be called again");
int DiscoveryNamingService::ParseNodesResult(
const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document nodes;
static Channel s_discovery_channel;
static pthread_once_t s_init_channel_once = PTHREAD_ONCE_INIT;
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
nodes.Parse(response.c_str());
auto itr = nodes.FindMember("data");
if (itr == nodes.MemberEnd()) {
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr = d.FindMember("data");
if (itr == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery nodes response";
return -1;
}
......@@ -68,13 +78,46 @@ int DiscoveryNamingService::ParseNodesResult(
return 0;
}
int DiscoveryNamingService::ParseFetchsResult(
const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
static void InitChannel() {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(FATAL) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
LOG(FATAL) << "Fail to parse nodes result from discovery api server";
return;
}
if (s_discovery_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << discovery_addr;
return;
}
}
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr_data = d.FindMember("data");
if (itr_data == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery fetchs response";
......@@ -114,6 +157,11 @@ int DiscoveryNamingService::ParseFetchsResult(
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
if (pos != 4 /* sizeof("grpc") */ ||
strncmp("grpc", addr.data(), 4) != 0) {
// Skip server that has prefix but not start with "grpc"
continue;
}
addr.remove_prefix(pos + 3);
}
ServerNode node;
......@@ -132,42 +180,13 @@ int DiscoveryNamingService::ParseFetchsResult(
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_is_initialized) {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return -1;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return -1;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
return -1;
}
if (_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << discovery_addr;
return -1;
}
_is_initialized = true;
}
pthread_once(&s_init_channel_once, InitChannel);
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
return -1;
......@@ -189,5 +208,269 @@ void DiscoveryNamingService::Destroy() {
delete this;
}
bool DiscoveryRegisterParam::IsValid() const {
if (appid.empty() || hostname.empty() || addrs.empty() ||
env.empty() || zone.empty() || version.empty()) {
return false;
}
return true;
}
DiscoveryClient::DiscoveryClient()
: _th(INVALID_BTHREAD)
, _state(INIT) {}
DiscoveryClient::~DiscoveryClient() {
Cancel();
}
int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
const std::string s = buf.to_string();
BUTIL_RAPIDJSON_NAMESPACE::Document d;
d.Parse(s.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr_code = d.FindMember("code");
if (itr_code == d.MemberEnd() || !itr_code->value.IsInt()) {
LOG(ERROR) << "Invalid `code' field in " << buf;
return -1;
}
int code = itr_code->value.GetInt();
auto itr_message = d.FindMember("message");
if (itr_message != d.MemberEnd() && itr_message->value.IsString() && error_text) {
error_text->assign(itr_message->value.GetString(),
itr_message->value.GetStringLength());
}
return code;
}
void* DiscoveryClient::PeriodicRenew(void* arg) {
DiscoveryClient* d = static_cast<DiscoveryClient*>(arg);
int consecutive_renew_error = 0;
int64_t init_sleep_s = FLAGS_discovery_renew_interval_s / 2 +
butil::fast_rand_less_than(FLAGS_discovery_renew_interval_s / 2);
if (bthread_usleep(init_sleep_s * 1000000) != 0) {
if (errno == ESTOP) {
return NULL;
}
}
while (!bthread_stopped(bthread_self())) {
if (consecutive_renew_error == FLAGS_discovery_reregister_threshold) {
LOG(WARNING) << "Reregister since discovery renew error threshold reached";
std::unique_lock<butil::Mutex> mu(d->_mutex);
switch (d->_state) {
case INIT:
CHECK(false) << "Impossible";
return NULL;
case REGISTERING:
case REGISTERED:
break;
case CANCELED:
return NULL;
default:
CHECK(false) << "Impossible";
return NULL;
}
// Do register until succeed or Cancel is called
while (!bthread_stopped(bthread_self())) {
if (d->do_register() == 0) {
break;
}
bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
}
consecutive_renew_error = 0;
}
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/renew";
cntl.http_request().set_content_type("application/x-www-form-urlencoded");
butil::IOBufBuilder os;
os << "appid=" << d->_appid
<< "&hostname=" << d->_hostname
<< "&env=" << d->_env
<< "&region=" << d->_region
<< "&zone=" << d->_zone;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText();
consecutive_renew_error++;
continue;
}
std::string error_text;
int rc = ParseCommonResult(cntl.response_attachment(), &error_text);
if (rc != 0) {
LOG(ERROR) << "Fail to renew " << d->_hostname << " to " << d->_appid
<< ": " << error_text;
consecutive_renew_error++;
continue;
}
consecutive_renew_error = 0;
if (bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000) != 0) {
if (errno == ESTOP) {
break;
}
}
}
return NULL;
}
int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
if (!req.IsValid()) {
return -1;
}
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
_state = REGISTERING;
break;
case REGISTERING:
case REGISTERED:
LOG(WARNING) << "Discovery Appid=" << req.appid
<<" is registering or registered";
return 0;
case CANCELED:
LOG(ERROR) << "Discovery Appid=" << req.appid << " is canceled";
return -1;
default:
CHECK(false) << "Impossible";
return -1;
}
}
pthread_once(&s_init_channel_once, InitChannel);
_appid = req.appid;
_hostname = req.hostname;
_addrs = req.addrs;
_env = req.env;
_region = req.region;
_zone = req.zone;
_status = req.status;
_version = req.version;
_metadata = req.metadata;
if (do_register() != 0) {
return -1;
}
if (bthread_start_background(&_th, NULL, PeriodicRenew, this) != 0) {
LOG(ERROR) << "Fail to start background PeriodicRenew";
return -1;
}
bool is_canceled = false;
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
CHECK(false) << "Impossible";
return -1;
case REGISTERING:
_state = REGISTERED;
break;
case REGISTERED:
CHECK(false) << "Impossible";
return -1;
case CANCELED:
is_canceled = true;
break;
default:
CHECK(false) << "Impossible";
return -1;
}
}
if (is_canceled) {
bthread_stop(_th);
bthread_join(_th, NULL);
return do_cancel();
}
return 0;
}
int DiscoveryClient::do_register() {
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/register";
cntl.http_request().set_content_type("application/x-www-form-urlencoded");
butil::IOBufBuilder os;
os << "appid=" << _appid
<< "&hostname=" << _hostname
<< "&addrs=" << _addrs
<< "&env=" << _env
<< "&zone=" << _zone
<< "&region=" << _region
<< "&status=" << _status
<< "&version=" << _version
<< "&metadata=" << _metadata;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to register " << _appid << ": " << cntl.ErrorText();
return -1;
}
std::string error_text;
int rc = ParseCommonResult(cntl.response_attachment(), &error_text);
if (rc != 0) {
LOG(ERROR) << "Fail to register " << _hostname << " to " << _appid
<< ": " << error_text;
return -1;
}
return 0;
}
int DiscoveryClient::Cancel() {
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
case REGISTERING:
_state = CANCELED;
return 0;
case REGISTERED:
_state = CANCELED;
break;
case CANCELED:
return 0;
default:
CHECK(false) << "Impossible";
return -1;
}
}
bthread_stop(_th);
bthread_join(_th, NULL);
return do_cancel();
}
int DiscoveryClient::do_cancel() {
pthread_once(&s_init_channel_once, InitChannel);
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/cancel";
butil::IOBufBuilder os;
os << "appid=" << _appid
<< "&hostname=" << _hostname
<< "&env=" << _env
<< "&region=" << _region
<< "&zone=" << _zone;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText();
return -1;
}
std::string error_text;
int rc = ParseCommonResult(cntl.response_attachment(), &error_text);
if (rc != 0) {
LOG(ERROR) << "Fail to cancel " << _hostname << " in " << _appid
<< ": " << error_text;
return -1;
}
return 0;
}
} // namespace policy
} // namespace brpc
......@@ -19,6 +19,7 @@
#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"
#include "butil/synchronization/lock.h"
namespace brpc {
namespace policy {
......@@ -33,18 +34,61 @@ private:
NamingService* New() const override;
void Destroy() override;
};
struct DiscoveryRegisterParam {
std::string appid;
std::string hostname;
std::string env;
std::string zone;
std::string region;
std::string addrs; // splitted by ','
int status;
std::string version;
std::string metadata;
bool IsValid() const;
};
// ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple instances to register, you need multiple
// DiscoveryClient.
class DiscoveryClient {
public:
DiscoveryClient();
~DiscoveryClient();
int Register(const DiscoveryRegisterParam& req);
int Cancel();
private:
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
int ParseFetchsResult(const butil::IOBuf& buf, const char* service_name,
std::vector<ServerNode>* servers);
static void* PeriodicRenew(void* arg);
int do_cancel();
int do_register();
Channel _channel;
bool _is_initialized = false;
private:
enum State {
INIT,
REGISTERING,
REGISTERED,
CANCELED
};
bthread_t _th;
State _state;
butil::Mutex _mutex;
std::string _appid;
std::string _hostname;
std::string _addrs;
std::string _env;
std::string _region;
std::string _zone;
int _status;
std::string _version;
std::string _metadata;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
......@@ -863,7 +863,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0) {
GetOrNewSharedPart( )->circuit_breaker.MarkAsBroken();
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
butil::milliseconds_from_now(GetOrNewSharedPart()->
......
......@@ -30,6 +30,13 @@ DECLARE_string(consul_file_naming_service_dir);
DECLARE_string(consul_service_discovery_url);
DECLARE_string(discovery_api_addr);
DECLARE_string(discovery_env);
DECLARE_int32(discovery_renew_interval_s);
// Defined in discovery_naming_service.cpp
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<brpc::ServerNode>* servers);
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
} // policy
} // brpc
......@@ -445,7 +452,7 @@ static const std::string s_fetchs_result = R"({
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
"grpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
......@@ -472,7 +479,7 @@ static const std::string s_fetchs_result = R"({
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
"grpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
......@@ -510,23 +517,26 @@ static std::string s_nodes_result = R"({
]
})";
TEST(NamingServiceTest, discovery_parse_function) {
std::vector<brpc::ServerNode> servers;
brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf;
buf.append(s_fetchs_result);
ASSERT_EQ(0, dcns.ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
ASSERT_EQ(0, brpc::policy::ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)1, servers.size());
buf.clear();
buf.append(s_nodes_result);
std::string server;
ASSERT_EQ(0, dcns.ParseNodesResult(buf, &server));
ASSERT_EQ(0, brpc::policy::ParseNodesResult(buf, &server));
ASSERT_EQ("127.0.0.1:8635", server);
}
class DiscoveryNamingServiceImpl : public test::DiscoveryNamingService {
public:
DiscoveryNamingServiceImpl () {}
DiscoveryNamingServiceImpl()
: _renew_count(0)
, _cancel_count(0) {}
virtual ~DiscoveryNamingServiceImpl() {}
void Nodes(google::protobuf::RpcController* cntl_base,
......@@ -546,15 +556,67 @@ public:
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(s_fetchs_result);
}
void Register(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
return;
}
void Renew(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
_renew_count++;
return;
}
void Cancel(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
_cancel_count++;
return;
}
int RenewCount() const { return _renew_count; }
int CancelCount() const { return _cancel_count; }
private:
int _renew_count;
int _cancel_count;
};
TEST(NamingServiceTest, discovery_sanity) {
brpc::policy::FLAGS_discovery_api_addr = "http://127.0.0.1:8635/discovery/nodes";
brpc::policy::FLAGS_discovery_renew_interval_s = 1;
brpc::Server server;
DiscoveryNamingServiceImpl svc;
std::string rest_mapping =
"/discovery/nodes => Nodes, "
"/discovery/fetchs => Fetchs";
"/discovery/fetchs => Fetchs, "
"/discovery/register => Register, "
"/discovery/renew => Renew, "
"/discovery/cancel => Cancel";
ASSERT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE,
rest_mapping.c_str()));
ASSERT_EQ(0, server.Start("localhost:8635", NULL));
......@@ -562,8 +624,35 @@ TEST(NamingServiceTest, discovery_sanity) {
brpc::policy::DiscoveryNamingService dcns;
std::vector<brpc::ServerNode> servers;
ASSERT_EQ(0, dcns.GetServers("admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
}
ASSERT_EQ((size_t)1, servers.size());
brpc::policy::DiscoveryClient dc;
brpc::policy::DiscoveryRegisterParam dparam;
dparam.appid = "main.test";
dparam.hostname = "hostname";
dparam.addrs = "grpc://10.0.0.1:8000";
dparam.env = "dev";
dparam.zone = "sh001";
dparam.status = 1;
dparam.version = "v1";
ASSERT_EQ(0, dc.Register(dparam));
bthread_usleep(1000000);
ASSERT_EQ(0, dc.Cancel());
ASSERT_GT(svc.RenewCount(), 0);
ASSERT_EQ(svc.CancelCount(), 1);
brpc::policy::DiscoveryClient dc2;
ASSERT_EQ(0, dc2.Cancel());
ASSERT_EQ(-1, dc2.Register(dparam));
{
brpc::policy::DiscoveryClient dc3;
ASSERT_EQ(0, dc3.Register(dparam));
ASSERT_EQ(0, dc3.Cancel());
}
// dtor of DiscoveryClient also calls Cancel(), we need to ensure that
// Cancel() is called only once.
ASSERT_EQ(svc.CancelCount(), 2);
}
} //namespace
......@@ -56,6 +56,9 @@ service UserNamingService {
service DiscoveryNamingService {
rpc Nodes(HttpRequest) returns (HttpResponse);
rpc Fetchs(HttpRequest) returns (HttpResponse);
rpc Register(HttpRequest) returns (HttpResponse);
rpc Renew(HttpRequest) returns (HttpResponse);
rpc Cancel(HttpRequest) returns (HttpResponse);
};
enum State0 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment