Commit 80938ef7 authored by zhujiashun's avatar zhujiashun

add discovery naming service

parent e67bd7f8
......@@ -30,6 +30,7 @@
#include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/discovery_naming_service.h"
// Load Balancers
#include "brpc/policy/round_robin_load_balancer.h"
......@@ -120,6 +121,7 @@ struct GlobalExtensions {
DomainNamingService dns;
RemoteFileNamingService rfns;
ConsulNamingService cns;
DiscoveryNamingService dcns;
RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb;
......@@ -339,6 +341,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns);
// Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
......
// Copyright (c) 2018 BiliBili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <gflags/gflags.h>
#include "butil/third_party/rapidjson/document.h"
#include "butil/string_printf.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/policy/discovery_naming_service.h"
namespace brpc {
namespace policy {
DEFINE_string(discovery_api_addr, "http://api.bilibili.co/discovery/nodes",
"The address of discovery api");
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "The environment of services");
int DiscoveryNamingService::parse_nodes_result(
const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document nodes;
const std::string response = buf.to_string();
nodes.Parse(response.c_str());
if (!nodes.HasMember("data")) {
LOG(ERROR) << "No data field in discovery nodes response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = nodes["data"];
if (!data.IsArray()) {
LOG(ERROR) << "data field is not an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < data.Size(); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& addr_item = data[i];
if (!addr_item.HasMember("addr") ||
!addr_item["addr"].IsString() ||
!addr_item.HasMember("status") ||
!addr_item["status"].IsUint() ||
addr_item["status"].GetUint() != 0) {
continue;
}
server_addr->assign(addr_item["addr"].GetString(),
addr_item["addr"].GetStringLength());
// Currently, we just use the first successful result
break;
}
return 0;
}
int DiscoveryNamingService::parse_fetchs_result(
const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
if (!d.HasMember("data")) {
LOG(ERROR) << "No data field in discovery fetchs response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = d["data"];
if (!data.HasMember(service_name)) {
LOG(ERROR) << "No " << service_name << " field in discovery response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& services = data[service_name];
if (!services.HasMember("instances")) {
LOG(ERROR) << "Fail to find instances";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = services["instances"];
if (!instances.IsArray()) {
LOG(ERROR) << "Fail to parse instances as an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
if (!instances[i].HasMember("addrs") || !instances[i]["addrs"].IsArray()) {
LOG(ERROR) << "Fail to find addrs or addrs is not an array";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = instances[i]["addrs"];
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
if (!addrs[j].IsString()) {
continue;
}
// The result returned by discovery include protocol prefix, such as
// http://172.22.35.68:6686, which should be removed.
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
addr.remove_prefix(pos + 3);
}
ServerNode node;
if (str2endpoint(addr.data(), &node.addr) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
servers->push_back(node);
}
}
return 0;
}
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_is_initialized) {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return -1;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr + "/discovery/nodes";
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return -1;
}
std::string discovery_addr;
if (parse_nodes_result(cntl.response_attachment(), &discovery_addr) != 0) {
return -1;
}
if (_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << discovery_addr;
return -1;
}
_is_initialized = true;
}
servers->clear();
Controller cntl;
// TODO(zhujiashun): pass zone from service_name
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=1", service_name,
FLAGS_discovery_env.c_str());
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
return -1;
}
return parse_fetchs_result(cntl.response_attachment(), service_name, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2018 BiliBili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
#define BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"
namespace brpc {
namespace policy {
class DiscoveryNamingService : public PeriodicNamingService {
private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers);
void Describe(std::ostream& os, const DescribeOptions&) const;
NamingService* New() const;
void Destroy();
private:
int parse_nodes_result(const butil::IOBuf& buf, std::string* server_addr);
int parse_fetchs_result(const butil::IOBuf& buf, const char* service_name,
std::vector<ServerNode>* servers);
Channel _channel;
bool _is_initialized = false;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
......@@ -15,6 +15,7 @@
#include "brpc/policy/file_naming_service.h"
#include "brpc/policy/list_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/discovery_naming_service.h"
#include "echo.pb.h"
#include "brpc/server.h"
......@@ -419,4 +420,20 @@ TEST(NamingServiceTest, consul_with_backup_file) {
brpc::FLAGS_health_check_interval = saved_hc_interval;
}
TEST(NamingServiceTest, discovery_parse_function) {
std::vector<brpc::ServerNode> servers;
brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf;
buf.append(R"({"code":0,"message":"0","ttl":1,"data":{"admin.test":{"instances":[{"region":"","zone":"sh001","env":"uat","appid":"admin.test","treeid":0,"hostname":"host123","http":"","rpc":"","version":"123","metadata":{},"addrs":["http://127.0.0.1:8999", "gorpc://127.0.1.1:9000"],"status":1,"reg_timestamp":1539001034551496412,"up_timestamp":1539001034551496412,"renew_timestamp":1539001034551496412,"dirty_timestamp":1539001034551496412,"latest_timestamp":1539001034551496412}],"zone_instances":{"sh001":[{"region":"","zone":"sh001","env":"uat","appid":"admin.test","treeid":0,"hostname":"host123","http":"","rpc":"","version":"123","metadata":{},"addrs":["http://127.0.0.1:8999", "gorpc://127.0.1.1:9000"],"status":1,"reg_timestamp":1539001034551496412,"up_timestamp":1539001034551496412,"renew_timestamp":1539001034551496412,"dirty_timestamp":1539001034551496412,"latest_timestamp":1539001034551496412}]},"latest_timestamp":1539001034551496412,"latest_timestamp_str":"1539001034"}}})");
ASSERT_EQ(0, dcns.parse_fetchs_result(buf, "admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
buf.clear();
buf.append(R"({ "code": 0, "message": "0", "ttl": 1, "data": [ { "addr": "172.18.33.50:7171", "status": 0, "zone": "" }, { "addr": "172.18.33.51:7171", "status": 0, "zone": "" }, { "addr": "172.18.33.52:7171", "status": 0, "zone": "" }]})");
std::string server;
ASSERT_EQ(0, dcns.parse_nodes_result(buf, &server));
ASSERT_EQ("172.18.33.50:7171", server);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment