Commit 4ee40610 authored by zhujiashun's avatar zhujiashun

Replace HasMember with FindMember of RapidJson in consul and discovery

parent c5c9af5f
...@@ -135,17 +135,24 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -135,17 +135,24 @@ int ConsulNamingService::GetServers(const char* service_name,
} }
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < services.Size(); ++i) { for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < services.Size(); ++i) {
if (!services[i].HasMember("Service")) { BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr =
services[i].FindMember("Service");
if (itr == services[i].MemberEnd()) {
LOG(ERROR) << "No service info in node: " LOG(ERROR) << "No service info in node: "
<< RapidjsonValueToString(services[i]); << RapidjsonValueToString(services[i]);
continue; continue;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& service = services[i]["Service"]; const BUTIL_RAPIDJSON_NAMESPACE::Value& service = itr->value;
if (!service.HasMember("Address") || BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr_address =
!service["Address"].IsString() || service.FindMember("Address");
!service.HasMember("Port") || BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr_port =
!service["Port"].IsUint()) { service.FindMember("Port");
if (itr_address == service.MemberEnd() ||
!itr_address->value.IsString() ||
itr_port == service.MemberEnd() ||
!itr_port->value.IsUint()) {
LOG(ERROR) << "Service with no valid address or port: " LOG(ERROR) << "Service with no valid address or port: "
<< RapidjsonValueToString(service); << RapidjsonValueToString(service);
continue; continue;
...@@ -162,12 +169,14 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -162,12 +169,14 @@ int ConsulNamingService::GetServers(const char* service_name,
ServerNode node; ServerNode node;
node.addr = end_point; node.addr = end_point;
if (service.HasMember("Tags")) { itr = service.FindMember("Tags");
if (service["Tags"].IsArray()) { if (itr != service.MemberEnd()) {
if (service["Tags"].Size() > 0) { if (itr->value.IsArray()) {
if (itr->value.Size() > 0) {
// Tags in consul is an array, here we only use the first one. // Tags in consul is an array, here we only use the first one.
if (service["Tags"][0].IsString()) { const BUTIL_RAPIDJSON_NAMESPACE::Value& tag = itr->value[0];
node.tag = service["Tags"][0].GetString(); if (tag.IsString()) {
node.tag = tag.GetString();
} else { } else {
LOG(ERROR) << "First tag returned by consul is not string, service: " LOG(ERROR) << "First tag returned by consul is not string, service: "
<< RapidjsonValueToString(service); << RapidjsonValueToString(service);
......
...@@ -24,76 +24,92 @@ ...@@ -24,76 +24,92 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
#ifdef BILIBILI_INTERNAL
DEFINE_string(discovery_api_addr, "http://api.bilibili.co/discovery/nodes", DEFINE_string(discovery_api_addr, "http://api.bilibili.co/discovery/nodes",
"The address of discovery api"); "The address of discovery api");
#else
DEFINE_string(discovery_api_addr, "", "The address of discovery api");
#endif
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests"); DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "Environment of services"); DEFINE_string(discovery_env, "prod", "Environment of services");
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all"); DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
int DiscoveryNamingService::parse_nodes_result( int DiscoveryNamingService::ParseNodesResult(
const butil::IOBuf& buf, std::string* server_addr) { const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document nodes; BUTIL_RAPIDJSON_NAMESPACE::Document nodes;
const std::string response = buf.to_string(); const std::string response = buf.to_string();
nodes.Parse(response.c_str()); nodes.Parse(response.c_str());
if (!nodes.HasMember("data")) { BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr =
nodes.FindMember("data");
if (itr == nodes.MemberEnd()) {
LOG(ERROR) << "No data field in discovery nodes response"; LOG(ERROR) << "No data field in discovery nodes response";
return -1; return -1;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = nodes["data"]; const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr->value;
if (!data.IsArray()) { if (!data.IsArray()) {
LOG(ERROR) << "data field is not an array"; LOG(ERROR) << "data field is not an array";
return -1; return -1;
} }
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < data.Size(); ++i) { for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < data.Size(); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& addr_item = data[i]; const BUTIL_RAPIDJSON_NAMESPACE::Value& addr_item = data[i];
if (!addr_item.HasMember("addr") || BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr_addr =
!addr_item["addr"].IsString() || addr_item.FindMember("addr");
!addr_item.HasMember("status") || BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr_status =
!addr_item["status"].IsUint() || addr_item.FindMember("status");
addr_item["status"].GetUint() != 0) {
if (itr_addr == addr_item.MemberEnd() ||
!itr_addr->value.IsString() ||
itr_status == addr_item.MemberEnd() ||
!itr_status->value.IsUint() ||
itr_status->value.GetUint() != 0) {
continue; continue;
} }
server_addr->assign(addr_item["addr"].GetString(), server_addr->assign(itr_addr->value.GetString(),
addr_item["addr"].GetStringLength()); itr_addr->value.GetStringLength());
// Currently, we just use the first successful result // Currently, we just use the first successful result
break; break;
} }
return 0; return 0;
} }
int DiscoveryNamingService::parse_fetchs_result( int DiscoveryNamingService::ParseFetchsResult(
const butil::IOBuf& buf, const butil::IOBuf& buf,
const char* service_name, const char* service_name,
std::vector<ServerNode>* servers) { std::vector<ServerNode>* servers) {
BUTIL_RAPIDJSON_NAMESPACE::Document d; BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string(); const std::string response = buf.to_string();
d.Parse(response.c_str()); d.Parse(response.c_str());
if (!d.HasMember("data")) { BUTIL_RAPIDJSON_NAMESPACE::Value::ConstMemberIterator itr =
d.FindMember("data");
if (itr == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery fetchs response"; LOG(ERROR) << "No data field in discovery fetchs response";
return -1; return -1;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = d["data"]; const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr->value;
if (!data.HasMember(service_name)) { itr = data.FindMember(service_name);
if (itr == data.MemberEnd()) {
LOG(ERROR) << "No " << service_name << " field in discovery response"; LOG(ERROR) << "No " << service_name << " field in discovery response";
return -1; return -1;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& services = data[service_name]; const BUTIL_RAPIDJSON_NAMESPACE::Value& services = itr->value;
if (!services.HasMember("instances")) { itr = services.FindMember("instances");
if (itr == services.MemberEnd()) {
LOG(ERROR) << "Fail to find instances"; LOG(ERROR) << "Fail to find instances";
return -1; return -1;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = services["instances"]; const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = itr->value;
if (!instances.IsArray()) { if (!instances.IsArray()) {
LOG(ERROR) << "Fail to parse instances as an array"; LOG(ERROR) << "Fail to parse instances as an array";
return -1; return -1;
} }
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) { for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
if (!instances[i].HasMember("addrs") || !instances[i]["addrs"].IsArray()) { itr = instances[i].FindMember("addrs");
if (itr == instances[i].MemberEnd() || !itr->value.IsArray()) {
LOG(ERROR) << "Fail to find addrs or addrs is not an array"; LOG(ERROR) << "Fail to find addrs or addrs is not an array";
return -1; return -1;
} }
const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = instances[i]["addrs"]; const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = itr->value;
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) { for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
if (!addrs[j].IsString()) { if (!addrs[j].IsString()) {
continue; continue;
...@@ -137,7 +153,7 @@ int DiscoveryNamingService::GetServers(const char* service_name, ...@@ -137,7 +153,7 @@ int DiscoveryNamingService::GetServers(const char* service_name,
return -1; return -1;
} }
std::string discovery_addr; std::string discovery_addr;
if (parse_nodes_result(cntl.response_attachment(), &discovery_addr) != 0) { if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
return -1; return -1;
} }
...@@ -159,7 +175,7 @@ int DiscoveryNamingService::GetServers(const char* service_name, ...@@ -159,7 +175,7 @@ int DiscoveryNamingService::GetServers(const char* service_name,
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText(); LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
return -1; return -1;
} }
return parse_fetchs_result(cntl.response_attachment(), service_name, servers); return ParseFetchsResult(cntl.response_attachment(), service_name, servers);
} }
void DiscoveryNamingService::Describe(std::ostream& os, void DiscoveryNamingService::Describe(std::ostream& os,
......
...@@ -26,17 +26,17 @@ namespace policy { ...@@ -26,17 +26,17 @@ namespace policy {
class DiscoveryNamingService : public PeriodicNamingService { class DiscoveryNamingService : public PeriodicNamingService {
private: private:
int GetServers(const char* service_name, int GetServers(const char* service_name,
std::vector<ServerNode>* servers); std::vector<ServerNode>* servers) override;
void Describe(std::ostream& os, const DescribeOptions&) const; void Describe(std::ostream& os, const DescribeOptions&) const override;
NamingService* New() const; NamingService* New() const override;
void Destroy(); void Destroy() override;
private: private:
int parse_nodes_result(const butil::IOBuf& buf, std::string* server_addr); int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
int parse_fetchs_result(const butil::IOBuf& buf, const char* service_name, int ParseFetchsResult(const butil::IOBuf& buf, const char* service_name,
std::vector<ServerNode>* servers); std::vector<ServerNode>* servers);
Channel _channel; Channel _channel;
......
...@@ -515,12 +515,12 @@ TEST(NamingServiceTest, discovery_parse_function) { ...@@ -515,12 +515,12 @@ TEST(NamingServiceTest, discovery_parse_function) {
brpc::policy::DiscoveryNamingService dcns; brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf; butil::IOBuf buf;
buf.append(s_fetchs_result); buf.append(s_fetchs_result);
ASSERT_EQ(0, dcns.parse_fetchs_result(buf, "admin.test", &servers)); ASSERT_EQ(0, dcns.ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size()); ASSERT_EQ((size_t)2, servers.size());
buf.clear(); buf.clear();
buf.append(s_nodes_result); buf.append(s_nodes_result);
std::string server; std::string server;
ASSERT_EQ(0, dcns.parse_nodes_result(buf, &server)); ASSERT_EQ(0, dcns.ParseNodesResult(buf, &server));
ASSERT_EQ("127.0.0.1:8635", server); ASSERT_EQ("127.0.0.1:8635", server);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment