Unverified Commit 1aeef03c authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #573 from zyearn/discovery

add discovery client
parents b2cdefdc 770cc50d
......@@ -17,6 +17,8 @@
#include <gflags/gflags.h>
#include "butil/third_party/rapidjson/document.h"
#include "butil/string_printf.h"
#include "butil/fast_rand.h"
#include "bthread/bthread.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/policy/discovery_naming_service.h"
......@@ -33,14 +35,44 @@ DEFINE_string(discovery_api_addr, "", "The address of discovery api");
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "Environment of services");
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
DEFINE_int32(discovery_renew_interval_s, 30, "The interval between two consecutive renews");
DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyond"
" which Register would be called again");
int DiscoveryNamingService::ParseNodesResult(
const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document nodes;
static Channel s_discovery_channel;
static pthread_once_t s_init_channel_once = PTHREAD_ONCE_INIT;
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
DiscoveryFetchsParam params{
service_name, FLAGS_discovery_env, FLAGS_discovery_status};
return _client.Fetchs(params, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
nodes.Parse(response.c_str());
auto itr = nodes.FindMember("data");
if (itr == nodes.MemberEnd()) {
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr = d.FindMember("data");
if (itr == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery nodes response";
return -1;
}
......@@ -68,13 +100,45 @@ int DiscoveryNamingService::ParseNodesResult(
return 0;
}
int DiscoveryNamingService::ParseFetchsResult(
const butil::IOBuf& buf,
static void InitChannel() {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(FATAL) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
LOG(FATAL) << "Fail to parse nodes result from discovery api server";
return;
}
if (s_discovery_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << discovery_addr;
return;
}
}
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr_data = d.FindMember("data");
if (itr_data == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery fetchs response";
......@@ -114,6 +178,11 @@ int DiscoveryNamingService::ParseFetchsResult(
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
if (pos != 4 /* sizeof("grpc") */ ||
strncmp("grpc", addr.data(), 4) != 0) {
// Skip server that has prefix but not start with "grpc"
continue;
}
addr.remove_prefix(pos + 3);
}
ServerNode node;
......@@ -130,63 +199,210 @@ int DiscoveryNamingService::ParseFetchsResult(
return 0;
}
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_is_initialized) {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << FLAGS_discovery_api_addr;
bool DiscoveryRegisterParam::IsValid() const {
return !appid.empty() && !hostname.empty() && !addrs.empty() &&
!env.empty() && !zone.empty() && !version.empty();
}
bool DiscoveryFetchsParam::IsValid() const {
return !appid.empty() && !env.empty() && !status.empty();
}
DiscoveryClient::DiscoveryClient()
: _th(INVALID_BTHREAD)
, _registered(false) {}
DiscoveryClient::~DiscoveryClient() {
if (_registered.load(butil::memory_order_acquire)) {
bthread_stop(_th);
bthread_join(_th, NULL);
DoCancel();
}
}
int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
const std::string s = buf.to_string();
BUTIL_RAPIDJSON_NAMESPACE::Document d;
d.Parse(s.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr_code = d.FindMember("code");
if (itr_code == d.MemberEnd() || !itr_code->value.IsInt()) {
LOG(ERROR) << "Invalid `code' field in " << buf;
return -1;
}
int code = itr_code->value.GetInt();
auto itr_message = d.FindMember("message");
if (itr_message != d.MemberEnd() && itr_message->value.IsString() && error_text) {
error_text->assign(itr_message->value.GetString(),
itr_message->value.GetStringLength());
}
return code;
}
int DiscoveryClient::DoRenew() const {
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/renew";
cntl.http_request().set_content_type("application/x-www-form-urlencoded");
butil::IOBufBuilder os;
os << "appid=" << _appid
<< "&hostname=" << _hostname
<< "&env=" << _env
<< "&region=" << _region
<< "&zone=" << _zone;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText();
return -1;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
std::string error_text;
if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
LOG(ERROR) << "Fail to renew " << _hostname << " to " << _appid
<< ": " << error_text;
return -1;
}
return 0;
}
void* DiscoveryClient::PeriodicRenew(void* arg) {
DiscoveryClient* d = static_cast<DiscoveryClient*>(arg);
int consecutive_renew_error = 0;
int64_t init_sleep_s = FLAGS_discovery_renew_interval_s / 2 +
butil::fast_rand_less_than(FLAGS_discovery_renew_interval_s / 2);
if (bthread_usleep(init_sleep_s * 1000000) != 0) {
if (errno == ESTOP) {
return NULL;
}
}
while (!bthread_stopped(bthread_self())) {
if (consecutive_renew_error == FLAGS_discovery_reregister_threshold) {
LOG(WARNING) << "Re-register since discovery renew error threshold reached";
// Do register until succeed or Cancel is called
while (!bthread_stopped(bthread_self())) {
if (d->DoRegister() == 0) {
break;
}
bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
}
consecutive_renew_error = 0;
}
if (d->DoRenew() != 0) {
consecutive_renew_error++;
continue;
}
consecutive_renew_error = 0;
bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
}
return NULL;
}
if (_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << discovery_addr;
int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
if (!req.IsValid()) {
return -1;
}
_is_initialized = true;
if (_registered.load(butil::memory_order_relaxed) ||
_registered.exchange(true, butil::memory_order_release)) {
return 0;
}
pthread_once(&s_init_channel_once, InitChannel);
_appid = req.appid;
_hostname = req.hostname;
_addrs = req.addrs;
_env = req.env;
_region = req.region;
_zone = req.zone;
_status = req.status;
_version = req.version;
_metadata = req.metadata;
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
if (DoRegister() != 0) {
return -1;
}
return ParseFetchsResult(cntl.response_attachment(), service_name, servers);
if (bthread_start_background(&_th, NULL, PeriodicRenew, this) != 0) {
LOG(ERROR) << "Fail to start background PeriodicRenew";
return -1;
}
return 0;
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
int DiscoveryClient::DoRegister() const {
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/register";
cntl.http_request().set_content_type("application/x-www-form-urlencoded");
butil::IOBufBuilder os;
os << "appid=" << _appid
<< "&hostname=" << _hostname
<< "&addrs=" << _addrs
<< "&env=" << _env
<< "&zone=" << _zone
<< "&region=" << _region
<< "&status=" << _status
<< "&version=" << _version
<< "&metadata=" << _metadata;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to register " << _appid << ": " << cntl.ErrorText();
return -1;
}
std::string error_text;
if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
LOG(ERROR) << "Fail to register " << _hostname << " to " << _appid
<< ": " << error_text;
return -1;
}
return 0;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
int DiscoveryClient::DoCancel() const {
pthread_once(&s_init_channel_once, InitChannel);
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/cancel";
butil::IOBufBuilder os;
os << "appid=" << _appid
<< "&hostname=" << _hostname
<< "&env=" << _env
<< "&region=" << _region
<< "&zone=" << _zone;
os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText();
return -1;
}
std::string error_text;
if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
LOG(ERROR) << "Fail to cancel " << _hostname << " in " << _appid
<< ": " << error_text;
return -1;
}
return 0;
}
void DiscoveryNamingService::Destroy() {
delete this;
int DiscoveryClient::Fetchs(const DiscoveryFetchsParam& req,
std::vector<ServerNode>* servers) const {
if (!req.IsValid()) {
return false;
}
pthread_once(&s_init_channel_once, InitChannel);
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", req.appid.c_str(),
req.env.c_str(), req.status.c_str());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText();
return -1;
}
return ParseFetchsResult(cntl.response_attachment(), req.appid.c_str(), servers);
}
} // namespace policy
......
......@@ -19,10 +19,65 @@
#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"
#include "butil/synchronization/lock.h"
namespace brpc {
namespace policy {
struct DiscoveryRegisterParam {
std::string appid;
std::string hostname;
std::string env;
std::string zone;
std::string region;
std::string addrs; // splitted by ','
int status;
std::string version;
std::string metadata;
bool IsValid() const;
};
struct DiscoveryFetchsParam {
std::string appid;
std::string env;
std::string status;
bool IsValid() const;
};
// ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple service instances to register,
// you need multiple DiscoveryClient.
// Note: Unregister is automatically called in dtor.
class DiscoveryClient {
public:
DiscoveryClient();
~DiscoveryClient();
int Register(const DiscoveryRegisterParam& req);
int Fetchs(const DiscoveryFetchsParam& req, std::vector<ServerNode>* servers) const;
private:
static void* PeriodicRenew(void* arg);
int DoCancel() const;
int DoRegister() const;
int DoRenew() const;
private:
bthread_t _th;
butil::atomic<bool> _registered;
std::string _appid;
std::string _hostname;
std::string _addrs;
std::string _env;
std::string _region;
std::string _zone;
int _status;
std::string _version;
std::string _metadata;
};
class DiscoveryNamingService : public PeriodicNamingService {
private:
int GetServers(const char* service_name,
......@@ -35,16 +90,11 @@ private:
void Destroy() override;
private:
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
int ParseFetchsResult(const butil::IOBuf& buf, const char* service_name,
std::vector<ServerNode>* servers);
Channel _channel;
bool _is_initialized = false;
DiscoveryClient _client;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
......@@ -863,7 +863,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if (_health_check_interval_s > 0) {
GetOrNewSharedPart( )->circuit_breaker.MarkAsBroken();
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
PeriodicTaskManager::StartTaskAt(
new HealthCheckTask(id()),
butil::milliseconds_from_now(GetOrNewSharedPart()->
......
......@@ -30,6 +30,13 @@ DECLARE_string(consul_file_naming_service_dir);
DECLARE_string(consul_service_discovery_url);
DECLARE_string(discovery_api_addr);
DECLARE_string(discovery_env);
DECLARE_int32(discovery_renew_interval_s);
// Defined in discovery_naming_service.cpp
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<brpc::ServerNode>* servers);
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
} // policy
} // brpc
......@@ -445,7 +452,7 @@ static const std::string s_fetchs_result = R"({
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
"grpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
......@@ -472,7 +479,7 @@ static const std::string s_fetchs_result = R"({
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
"grpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
......@@ -510,23 +517,26 @@ static std::string s_nodes_result = R"({
]
})";
TEST(NamingServiceTest, discovery_parse_function) {
std::vector<brpc::ServerNode> servers;
brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf;
buf.append(s_fetchs_result);
ASSERT_EQ(0, dcns.ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
ASSERT_EQ(0, brpc::policy::ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)1, servers.size());
buf.clear();
buf.append(s_nodes_result);
std::string server;
ASSERT_EQ(0, dcns.ParseNodesResult(buf, &server));
ASSERT_EQ(0, brpc::policy::ParseNodesResult(buf, &server));
ASSERT_EQ("127.0.0.1:8635", server);
}
class DiscoveryNamingServiceImpl : public test::DiscoveryNamingService {
public:
DiscoveryNamingServiceImpl () {}
DiscoveryNamingServiceImpl()
: _renew_count(0)
, _cancel_count(0) {}
virtual ~DiscoveryNamingServiceImpl() {}
void Nodes(google::protobuf::RpcController* cntl_base,
......@@ -546,15 +556,67 @@ public:
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(s_fetchs_result);
}
void Register(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
return;
}
void Renew(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
_renew_count++;
return;
}
void Cancel(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(R"({
"code": 0,
"message": "0"
})");
_cancel_count++;
return;
}
int RenewCount() const { return _renew_count; }
int CancelCount() const { return _cancel_count; }
private:
int _renew_count;
int _cancel_count;
};
TEST(NamingServiceTest, discovery_sanity) {
brpc::policy::FLAGS_discovery_api_addr = "http://127.0.0.1:8635/discovery/nodes";
brpc::policy::FLAGS_discovery_renew_interval_s = 1;
brpc::Server server;
DiscoveryNamingServiceImpl svc;
std::string rest_mapping =
"/discovery/nodes => Nodes, "
"/discovery/fetchs => Fetchs";
"/discovery/fetchs => Fetchs, "
"/discovery/register => Register, "
"/discovery/renew => Renew, "
"/discovery/cancel => Cancel";
ASSERT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE,
rest_mapping.c_str()));
ASSERT_EQ(0, server.Start("localhost:8635", NULL));
......@@ -562,8 +624,31 @@ TEST(NamingServiceTest, discovery_sanity) {
brpc::policy::DiscoveryNamingService dcns;
std::vector<brpc::ServerNode> servers;
ASSERT_EQ(0, dcns.GetServers("admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
ASSERT_EQ((size_t)1, servers.size());
brpc::policy::DiscoveryRegisterParam dparam;
dparam.appid = "main.test";
dparam.hostname = "hostname";
dparam.addrs = "grpc://10.0.0.1:8000";
dparam.env = "dev";
dparam.zone = "sh001";
dparam.status = 1;
dparam.version = "v1";
{
brpc::policy::DiscoveryClient dc;
}
// Cancel is called iff Register is called
ASSERT_EQ(svc.CancelCount(), 0);
{
brpc::policy::DiscoveryClient dc;
// Two Register should start one Renew task , and make
// svc.RenewCount() be one.
ASSERT_EQ(0, dc.Register(dparam));
ASSERT_EQ(0, dc.Register(dparam));
bthread_usleep(1000000);
}
ASSERT_EQ(svc.RenewCount(), 1);
ASSERT_EQ(svc.CancelCount(), 1);
}
} //namespace
......@@ -208,7 +208,7 @@ TEST(ButexTest, wait_without_stop) {
ASSERT_EQ(0, bthread_join(th, NULL));
tm.stop();
ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 40);
ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 250);
}
bthread::butex_destroy(butex);
}
......
......@@ -56,6 +56,9 @@ service UserNamingService {
service DiscoveryNamingService {
rpc Nodes(HttpRequest) returns (HttpResponse);
rpc Fetchs(HttpRequest) returns (HttpResponse);
rpc Register(HttpRequest) returns (HttpResponse);
rpc Renew(HttpRequest) returns (HttpResponse);
rpc Cancel(HttpRequest) returns (HttpResponse);
};
enum State0 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment