Unverified Commit d6d86cf0 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #525 from zyearn/discovery

add discovery naming service
parents e67bd7f8 05ccc6d1
......@@ -30,6 +30,7 @@
#include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/discovery_naming_service.h"
// Load Balancers
#include "brpc/policy/round_robin_load_balancer.h"
......@@ -120,6 +121,7 @@ struct GlobalExtensions {
DomainNamingService dns;
RemoteFileNamingService rfns;
ConsulNamingService cns;
DiscoveryNamingService dcns;
RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb;
......@@ -339,6 +341,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns);
// Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
......
......@@ -135,17 +135,20 @@ int ConsulNamingService::GetServers(const char* service_name,
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < services.Size(); ++i) {
if (!services[i].HasMember("Service")) {
auto itr_service = services[i].FindMember("Service");
if (itr_service == services[i].MemberEnd()) {
LOG(ERROR) << "No service info in node: "
<< RapidjsonValueToString(services[i]);
continue;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& service = services[i]["Service"];
if (!service.HasMember("Address") ||
!service["Address"].IsString() ||
!service.HasMember("Port") ||
!service["Port"].IsUint()) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& service = itr_service->value;
auto itr_address = service.FindMember("Address");
auto itr_port = service.FindMember("Port");
if (itr_address == service.MemberEnd() ||
!itr_address->value.IsString() ||
itr_port == service.MemberEnd() ||
!itr_port->value.IsUint()) {
LOG(ERROR) << "Service with no valid address or port: "
<< RapidjsonValueToString(service);
continue;
......@@ -162,12 +165,14 @@ int ConsulNamingService::GetServers(const char* service_name,
ServerNode node;
node.addr = end_point;
if (service.HasMember("Tags")) {
if (service["Tags"].IsArray()) {
if (service["Tags"].Size() > 0) {
auto itr_tags = service.FindMember("Tags");
if (itr_tags != service.MemberEnd()) {
if (itr_tags->value.IsArray()) {
if (itr_tags->value.Size() > 0) {
// Tags in consul is an array, here we only use the first one.
if (service["Tags"][0].IsString()) {
node.tag = service["Tags"][0].GetString();
const BUTIL_RAPIDJSON_NAMESPACE::Value& tag = itr_tags->value[0];
if (tag.IsString()) {
node.tag = tag.GetString();
} else {
LOG(ERROR) << "First tag returned by consul is not string, service: "
<< RapidjsonValueToString(service);
......
// Copyright (c) 2018 BiliBili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <gflags/gflags.h>
#include "butil/third_party/rapidjson/document.h"
#include "butil/string_printf.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/policy/discovery_naming_service.h"
namespace brpc {
namespace policy {
#ifdef BILIBILI_INTERNAL
DEFINE_string(discovery_api_addr, "http://api.bilibili.co/discovery/nodes",
"The address of discovery api");
#else
DEFINE_string(discovery_api_addr, "", "The address of discovery api");
#endif
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
DEFINE_string(discovery_env, "prod", "Environment of services");
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
int DiscoveryNamingService::ParseNodesResult(
const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document nodes;
const std::string response = buf.to_string();
nodes.Parse(response.c_str());
auto itr = nodes.FindMember("data");
if (itr == nodes.MemberEnd()) {
LOG(ERROR) << "No data field in discovery nodes response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr->value;
if (!data.IsArray()) {
LOG(ERROR) << "data field is not an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < data.Size(); ++i) {
const BUTIL_RAPIDJSON_NAMESPACE::Value& addr_item = data[i];
auto itr_addr = addr_item.FindMember("addr");
auto itr_status = addr_item.FindMember("status");
if (itr_addr == addr_item.MemberEnd() ||
!itr_addr->value.IsString() ||
itr_status == addr_item.MemberEnd() ||
!itr_status->value.IsUint() ||
itr_status->value.GetUint() != 0) {
continue;
}
server_addr->assign(itr_addr->value.GetString(),
itr_addr->value.GetStringLength());
// Currently, we just use the first successful result
break;
}
return 0;
}
int DiscoveryNamingService::ParseFetchsResult(
const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
auto itr_data = d.FindMember("data");
if (itr_data == d.MemberEnd()) {
LOG(ERROR) << "No data field in discovery fetchs response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr_data->value;
auto itr_service = data.FindMember(service_name);
if (itr_service == data.MemberEnd()) {
LOG(ERROR) << "No " << service_name << " field in discovery response";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& services = itr_service->value;
auto itr_instances = services.FindMember("instances");
if (itr_instances == services.MemberEnd()) {
LOG(ERROR) << "Fail to find instances";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = itr_instances->value;
if (!instances.IsArray()) {
LOG(ERROR) << "Fail to parse instances as an array";
return -1;
}
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
auto itr = instances[i].FindMember("addrs");
if (itr == instances[i].MemberEnd() || !itr->value.IsArray()) {
LOG(ERROR) << "Fail to find addrs or addrs is not an array";
return -1;
}
const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = itr->value;
for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
if (!addrs[j].IsString()) {
continue;
}
// The result returned by discovery include protocol prefix, such as
// http://172.22.35.68:6686, which should be removed.
butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
butil::StringPiece::size_type pos = addr.find("://");
if (pos != butil::StringPiece::npos) {
addr.remove_prefix(pos + 3);
}
ServerNode node;
// Variable addr contains data from addrs[j].GetString(), it is a
// null-terminated string, so it is safe to pass addr.data() as the
// first parameter to str2endpoint.
if (str2endpoint(addr.data(), &node.addr) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
servers->push_back(node);
}
}
return 0;
}
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_is_initialized) {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return -1;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return -1;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
return -1;
}
if (_channel.Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << discovery_addr;
return -1;
}
_is_initialized = true;
}
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
return -1;
}
return ParseFetchsResult(cntl.response_attachment(), service_name, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2018 BiliBili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
#define BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
#include "brpc/periodic_naming_service.h"
#include "brpc/channel.h"
namespace brpc {
namespace policy {
class DiscoveryNamingService : public PeriodicNamingService {
private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers) override;
void Describe(std::ostream& os, const DescribeOptions&) const override;
NamingService* New() const override;
void Destroy() override;
private:
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
int ParseFetchsResult(const butil::IOBuf& buf, const char* service_name,
std::vector<ServerNode>* servers);
Channel _channel;
bool _is_initialized = false;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_DISCOVERY_NAMING_SERVICE_H
......@@ -15,6 +15,7 @@
#include "brpc/policy/file_naming_service.h"
#include "brpc/policy/list_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/discovery_naming_service.h"
#include "echo.pb.h"
#include "brpc/server.h"
......@@ -27,6 +28,8 @@ namespace policy {
DECLARE_bool(consul_enable_degrade_to_file_naming_service);
DECLARE_string(consul_file_naming_service_dir);
DECLARE_string(consul_service_discovery_url);
DECLARE_string(discovery_api_addr);
DECLARE_string(discovery_env);
} // policy
} // brpc
......@@ -419,4 +422,148 @@ TEST(NamingServiceTest, consul_with_backup_file) {
brpc::FLAGS_health_check_interval = saved_hc_interval;
}
static const std::string s_fetchs_result = R"({
"code":0,
"message":"0",
"ttl":1,
"data":{
"admin.test":{
"instances":[
{
"region":"",
"zone":"sh001",
"env":"uat",
"appid":"admin.test",
"treeid":0,
"hostname":"host123",
"http":"",
"rpc":"",
"version":"123",
"metadata":{
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
"up_timestamp":1539001034551496412,
"renew_timestamp":1539001034551496412,
"dirty_timestamp":1539001034551496412,
"latest_timestamp":1539001034551496412
}
],
"zone_instances":{
"sh001":[
{
"region":"",
"zone":"sh001",
"env":"uat",
"appid":"admin.test",
"treeid":0,
"hostname":"host123",
"http":"",
"rpc":"",
"version":"123",
"metadata":{
},
"addrs":[
"http://127.0.0.1:8999",
"gorpc://127.0.1.1:9000"
],
"status":1,
"reg_timestamp":1539001034551496412,
"up_timestamp":1539001034551496412,
"renew_timestamp":1539001034551496412,
"dirty_timestamp":1539001034551496412,
"latest_timestamp":1539001034551496412
}
]
},
"latest_timestamp":1539001034551496412,
"latest_timestamp_str":"1539001034"
}
}
})";
static std::string s_nodes_result = R"({
"code": 0,
"message": "0",
"ttl": 1,
"data": [
{
"addr": "127.0.0.1:8635",
"status": 0,
"zone": ""
}, {
"addr": "172.18.33.51:7171",
"status": 0,
"zone": ""
}, {
"addr": "172.18.33.52:7171",
"status": 0,
"zone": ""
}
]
})";
TEST(NamingServiceTest, discovery_parse_function) {
std::vector<brpc::ServerNode> servers;
brpc::policy::DiscoveryNamingService dcns;
butil::IOBuf buf;
buf.append(s_fetchs_result);
ASSERT_EQ(0, dcns.ParseFetchsResult(buf, "admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
buf.clear();
buf.append(s_nodes_result);
std::string server;
ASSERT_EQ(0, dcns.ParseNodesResult(buf, &server));
ASSERT_EQ("127.0.0.1:8635", server);
}
class DiscoveryNamingServiceImpl : public test::DiscoveryNamingService {
public:
DiscoveryNamingServiceImpl () {}
virtual ~DiscoveryNamingServiceImpl() {}
void Nodes(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(s_nodes_result);
}
void Fetchs(google::protobuf::RpcController* cntl_base,
const test::HttpRequest*,
test::HttpResponse*,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(s_fetchs_result);
}
};
TEST(NamingServiceTest, discovery_sanity) {
brpc::policy::FLAGS_discovery_api_addr = "http://127.0.0.1:8635/discovery/nodes";
brpc::Server server;
DiscoveryNamingServiceImpl svc;
std::string rest_mapping =
"/discovery/nodes => Nodes, "
"/discovery/fetchs => Fetchs";
ASSERT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE,
rest_mapping.c_str()));
ASSERT_EQ(0, server.Start("localhost:8635", NULL));
brpc::policy::DiscoveryNamingService dcns;
std::vector<brpc::ServerNode> servers;
ASSERT_EQ(0, dcns.GetServers("admin.test", &servers));
ASSERT_EQ((size_t)2, servers.size());
}
} //namespace
......@@ -53,6 +53,11 @@ service UserNamingService {
rpc Touch(HttpRequest) returns (HttpResponse);
};
service DiscoveryNamingService {
rpc Nodes(HttpRequest) returns (HttpResponse);
rpc Fetchs(HttpRequest) returns (HttpResponse);
};
enum State0 {
STATE0_NUM_0 = 0;
STATE0_NUM_1 = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment