Commit bc7165fb authored by jamesge's avatar jamesge

Remove fetch related methods from DiscoveryClient

parent 6f4dd466
...@@ -32,11 +32,12 @@ namespace brpc { ...@@ -32,11 +32,12 @@ namespace brpc {
namespace policy { namespace policy {
#ifdef BILIBILI_INTERNAL #ifdef BILIBILI_INTERNAL
DEFINE_string(discovery_api_addr, "http://api.bilibili.co/discovery/nodes", # define DEFAULT_DISCOVERY_API_ADDR "http://api.bilibili.co/discovery/nodes"
"The address of discovery api");
#else #else
DEFINE_string(discovery_api_addr, "", "The address of discovery api"); # define DEFAULT_DISCOVERY_API_ADDR ""
#endif #endif
DEFINE_string(discovery_api_addr, DEFAULT_DISCOVERY_API_ADDR, "The address of discovery api");
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests"); DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "Environment of services"); DEFINE_string(discovery_env, "prod", "Environment of services");
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all"); DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
...@@ -44,29 +45,8 @@ DEFINE_int32(discovery_renew_interval_s, 30, "The interval between two consecuti ...@@ -44,29 +45,8 @@ DEFINE_int32(discovery_renew_interval_s, 30, "The interval between two consecuti
DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyond" DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyond"
" which Register would be called again"); " which Register would be called again");
static Channel s_discovery_channel; static pthread_once_t s_init_discovery_channel_once = PTHREAD_ONCE_INIT;
static pthread_once_t s_init_channel_once = PTHREAD_ONCE_INIT; static Channel* s_discovery_channel = NULL;
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
DiscoveryFetchsParam params{
service_name, FLAGS_discovery_env, FLAGS_discovery_status};
return _client.Fetchs(params, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) { int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document d; BUTIL_RAPIDJSON_NAMESPACE::Document d;
...@@ -105,7 +85,7 @@ int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) { ...@@ -105,7 +85,7 @@ int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
return 0; return 0;
} }
static void InitChannel() { static void InitDiscoveryChannel() {
Channel api_channel; Channel api_channel;
ChannelOptions channel_options; ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP; channel_options.protocol = PROTOCOL_HTTP;
...@@ -128,102 +108,24 @@ static void InitChannel() { ...@@ -128,102 +108,24 @@ static void InitChannel() {
LOG(FATAL) << "Fail to parse nodes result from discovery api server"; LOG(FATAL) << "Fail to parse nodes result from discovery api server";
return; return;
} }
if (s_discovery_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) { s_discovery_channel = new Channel;
if (s_discovery_channel->Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << discovery_addr; LOG(FATAL) << "Fail to init channel to " << discovery_addr;
return; return;
} }
} }
int ParseFetchsResult(const butil::IOBuf& buf, Channel* GetDiscoveryChannel() {
const char* service_name, pthread_once(&s_init_discovery_channel_once, InitDiscoveryChannel);
std::vector<ServerNode>* servers) { return s_discovery_channel;
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
return -1;
}
auto itr_data = d.FindMember("data");
if (itr_data == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery fetchs response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr_data->value;
auto itr_service = data.FindMember(service_name);
if (itr_service == data.MemberEnd()) {
LOG(ERROR) << "No " << service_name << " field in discovery response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& services = itr_service->value;
auto itr_instances = services.FindMember("instances");
if (itr_instances == services.MemberEnd()) {
LOG(ERROR) << "Fail to find instances";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = itr_instances->value;
if (!instances.IsArray()) {
LOG(ERROR) << "Fail to parse instances as an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
std::string metadata;
// convert metadata in object to string
auto itr_metadata = instances[i].FindMember("metadata");
if (itr_metadata != instances[i].MemberEnd()) {
BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer buffer;
BUTIL_RAPIDJSON_NAMESPACE::Writer<BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer> writer(buffer);
itr_metadata->value.Accept(writer);
metadata.assign(buffer.GetBuffer(), buffer.GetSize());
}
auto itr = instances[i].FindMember("addrs");
if (itr == instances[i].MemberEnd() || !itr->value.IsArray()) {
LOG(ERROR) << "Fail to find addrs or addrs is not an array";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = itr->value;
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
if (!addrs[j].IsString()) {
continue;
}
// The result returned by discovery include protocol prefix, such as
// http://172.22.35.68:6686, which should be removed.
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
if (pos != 4 /* sizeof("grpc") */ ||
strncmp("grpc", addr.data(), 4) != 0) {
// Skip server that has prefix but not start with "grpc"
continue;
}
addr.remove_prefix(pos + 3);
}
ServerNode node;
node.tag = metadata;
// Variable addr contains data from addrs[j].GetString(), it is a
// null-terminated string, so it is safe to pass addr.data() as the
// first parameter to str2endpoint.
if (str2endpoint(addr.data(), &node.addr) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
servers->push_back(node);
}
}
return 0;
} }
bool DiscoveryRegisterParam::IsValid() const { bool DiscoveryRegisterParam::IsValid() const {
return !appid.empty() && !hostname.empty() && !addrs.empty() && return !appid.empty() && !hostname.empty() && !addrs.empty() &&
!env.empty() && !zone.empty() && !version.empty(); !env.empty() && !zone.empty() && !version.empty();
} }
bool DiscoveryFetchsParam::IsValid() const {
return !appid.empty() && !env.empty() && !status.empty();
}
DiscoveryClient::DiscoveryClient() DiscoveryClient::DiscoveryClient()
: _th(INVALID_BTHREAD) : _th(INVALID_BTHREAD)
, _registered(false) {} , _registered(false) {}
...@@ -270,7 +172,7 @@ int DiscoveryClient::DoRenew() const { ...@@ -270,7 +172,7 @@ int DiscoveryClient::DoRenew() const {
<< "&region=" << _region << "&region=" << _region
<< "&zone=" << _zone; << "&zone=" << _zone;
os.move_to(cntl.request_attachment()); os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText(); LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText();
return -1; return -1;
...@@ -325,7 +227,6 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) { ...@@ -325,7 +227,6 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
_registered.exchange(true, butil::memory_order_release)) { _registered.exchange(true, butil::memory_order_release)) {
return 0; return 0;
} }
pthread_once(&s_init_channel_once, InitChannel);
_appid = req.appid; _appid = req.appid;
_hostname = req.hostname; _hostname = req.hostname;
_addrs = req.addrs; _addrs = req.addrs;
...@@ -362,7 +263,7 @@ int DiscoveryClient::DoRegister() const { ...@@ -362,7 +263,7 @@ int DiscoveryClient::DoRegister() const {
<< "&version=" << _version << "&version=" << _version
<< "&metadata=" << _metadata; << "&metadata=" << _metadata;
os.move_to(cntl.request_attachment()); os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to register " << _appid << ": " << cntl.ErrorText(); LOG(ERROR) << "Fail to register " << _appid << ": " << cntl.ErrorText();
return -1; return -1;
...@@ -377,7 +278,6 @@ int DiscoveryClient::DoRegister() const { ...@@ -377,7 +278,6 @@ int DiscoveryClient::DoRegister() const {
} }
int DiscoveryClient::DoCancel() const { int DiscoveryClient::DoCancel() const {
pthread_once(&s_init_channel_once, InitChannel);
Controller cntl; Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST); cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/cancel"; cntl.http_request().uri() = "/discovery/cancel";
...@@ -389,7 +289,7 @@ int DiscoveryClient::DoCancel() const { ...@@ -389,7 +289,7 @@ int DiscoveryClient::DoCancel() const {
<< "&region=" << _region << "&region=" << _region
<< "&zone=" << _zone; << "&zone=" << _zone;
os.move_to(cntl.request_attachment()); os.move_to(cntl.request_attachment());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText(); LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText();
return -1; return -1;
...@@ -403,24 +303,119 @@ int DiscoveryClient::DoCancel() const { ...@@ -403,24 +303,119 @@ int DiscoveryClient::DoCancel() const {
return 0; return 0;
} }
int DiscoveryClient::Fetchs(const DiscoveryFetchsParam& req, // ========== DiscoveryNamingService =============
std::vector<ServerNode>* servers) const {
if (!req.IsValid()) { int DiscoveryNamingService::GetServers(const char* service_name,
return false; std::vector<ServerNode>* servers) {
if (service_name == NULL || *service_name == '\0' ||
FLAGS_discovery_env.empty() ||
FLAGS_discovery_status.empty()) {
LOG_ONCE(ERROR) << "Invalid parameters";
return -1;
} }
pthread_once(&s_init_channel_once, InitChannel);
servers->clear(); servers->clear();
Controller cntl; Controller cntl;
cntl.http_request().uri() = butil::string_printf( cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", req.appid.c_str(), "/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
req.env.c_str(), req.status.c_str()); FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText(); LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText();
return -1; return -1;
} }
return ParseFetchsResult(cntl.response_attachment(), req.appid.c_str(), servers);
const std::string response = cntl.response_attachment().to_string();
BUTIL_RAPIDJSON_NAMESPACE::Document d;
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << response << " as json object";
return -1;
}
auto itr_data = d.FindMember("data");
if (itr_data == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery/fetchs response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr_data->value;
auto itr_service = data.FindMember(service_name);
if (itr_service == data.MemberEnd()) {
LOG(ERROR) << "No " << service_name << " field in discovery response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& services = itr_service->value;
auto itr_instances = services.FindMember("instances");
if (itr_instances == services.MemberEnd()) {
LOG(ERROR) << "Fail to find instances";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = itr_instances->value;
if (!instances.IsArray()) {
LOG(ERROR) << "Fail to parse instances as an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
std::string metadata;
// convert metadata in object to string
auto itr_metadata = instances[i].FindMember("metadata");
if (itr_metadata != instances[i].MemberEnd()) {
BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer buffer;
BUTIL_RAPIDJSON_NAMESPACE::Writer<BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer> writer(buffer);
itr_metadata->value.Accept(writer);
metadata.assign(buffer.GetBuffer(), buffer.GetSize());
}
auto itr = instances[i].FindMember("addrs");
if (itr == instances[i].MemberEnd() || !itr->value.IsArray()) {
LOG(ERROR) << "Fail to find addrs or addrs is not an array";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = itr->value;
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
if (!addrs[j].IsString()) {
continue;
}
// The result returned by discovery include protocol prefix, such as
// http://172.22.35.68:6686, which should be removed.
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
if (pos != 4 /* sizeof("grpc") */ ||
strncmp("grpc", addr.data(), 4) != 0) {
// Skip server that has prefix but not start with "grpc"
continue;
}
addr.remove_prefix(pos + 3);
}
ServerNode node;
node.tag = metadata;
// Variable addr contains data from addrs[j].GetString(), it is a
// null-terminated string, so it is safe to pass addr.data() as the
// first parameter to str2endpoint.
if (str2endpoint(addr.data(), &node.addr) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
servers->push_back(node);
}
}
return 0;
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
} }
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -41,14 +41,6 @@ struct DiscoveryRegisterParam { ...@@ -41,14 +41,6 @@ struct DiscoveryRegisterParam {
bool IsValid() const; bool IsValid() const;
}; };
struct DiscoveryFetchsParam {
std::string appid;
std::string env;
std::string status;
bool IsValid() const;
};
// ONE DiscoveryClient corresponds to ONE service instance. // ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple service instances to register, // If your program has multiple service instances to register,
// you need multiple DiscoveryClient. // you need multiple DiscoveryClient.
...@@ -59,7 +51,6 @@ public: ...@@ -59,7 +51,6 @@ public:
~DiscoveryClient(); ~DiscoveryClient();
int Register(const DiscoveryRegisterParam& req); int Register(const DiscoveryRegisterParam& req);
int Fetchs(const DiscoveryFetchsParam& req, std::vector<ServerNode>* servers) const;
private: private:
static void* PeriodicRenew(void* arg); static void* PeriodicRenew(void* arg);
......
...@@ -117,8 +117,10 @@ private: ...@@ -117,8 +117,10 @@ private:
pthread_t _tid; pthread_t _tid;
}; };
#ifndef UNIT_TEST
static PassiveStatus<double>* s_cumulated_time_bvar = NULL; static PassiveStatus<double>* s_cumulated_time_bvar = NULL;
static bvar::PerSecond<bvar::PassiveStatus<double> >* s_sampling_thread_usage_bvar = NULL; static bvar::PerSecond<bvar::PassiveStatus<double> >* s_sampling_thread_usage_bvar = NULL;
#endif
void SamplerCollector::run() { void SamplerCollector::run() {
#ifndef UNIT_TEST #ifndef UNIT_TEST
......
...@@ -47,9 +47,6 @@ DECLARE_string(discovery_env); ...@@ -47,9 +47,6 @@ DECLARE_string(discovery_env);
DECLARE_int32(discovery_renew_interval_s); DECLARE_int32(discovery_renew_interval_s);
// Defined in discovery_naming_service.cpp // Defined in discovery_naming_service.cpp
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<brpc::ServerNode>* servers);
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr); int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
} // policy } // policy
...@@ -534,21 +531,6 @@ static std::string s_nodes_result = R"({ ...@@ -534,21 +531,6 @@ static std::string s_nodes_result = R"({
})"; })";
TEST(NamingServiceTest, discovery_parse_function) {
std::vector<brpc::ServerNode> servers;
brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf;
buf.append(s_fetchs_result);
ASSERT_EQ(0, brpc::policy::ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)1, servers.size());
ASSERT_EQ(servers[0].tag, "{\"weight\":\"10\",\"cluster\":\"\"}");
buf.clear();
buf.append(s_nodes_result);
std::string server;
ASSERT_EQ(0, brpc::policy::ParseNodesResult(buf, &server));
ASSERT_EQ("127.0.0.1:8635", server);
}
class DiscoveryNamingServiceImpl : public test::DiscoveryNamingService { class DiscoveryNamingServiceImpl : public test::DiscoveryNamingService {
public: public:
DiscoveryNamingServiceImpl() DiscoveryNamingServiceImpl()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment