diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 63b9fd62a1785de7c3493afa235cb037e8a99511..7c6a5496846063770dc71cbf004beff307b4b23a 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -1,11 +1,11 @@ // Copyright (c) 2014 Baidu, Inc. -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,6 +28,7 @@ #include "brpc/policy/list_naming_service.h" #include "brpc/policy/domain_naming_service.h" #include "brpc/policy/remote_file_naming_service.h" +#include "brpc/policy/consul_naming_service.h" // Load Balancers #include "brpc/policy/round_robin_load_balancer.h" @@ -105,6 +106,7 @@ struct GlobalExtensions { ListNamingService lns; DomainNamingService dns; RemoteFileNamingService rfns; + ConsulNamingService cns; RoundRobinLoadBalancer rr_lb; WeightedRoundRobinLoadBalancer wrr_lb; @@ -163,7 +165,7 @@ extern butil::static_atomic<int> g_running_server_count; static int GetRunningServerCount(void*) { return g_running_server_count.load(butil::memory_order_relaxed); } - + // Update global stuff periodically. static void* GlobalUpdate(void*) { // Expose variables. @@ -180,7 +182,7 @@ static void* GlobalUpdate(void*) { "iobuf_block_memory", GetIOBufBlockMemory, NULL); bvar::PassiveStatus<int> var_running_server_count( "rpc_server_count", GetRunningServerCount, NULL); - + butil::FileWatcher fw; if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) { LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'"; @@ -201,16 +203,16 @@ static void* GlobalUpdate(void*) { break; } consecutive_nosleep = 0; - } else { + } else { if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) { consecutive_nosleep = 0; LOG(WARNING) << __FUNCTION__ << " is too busy!"; } } last_time_us = butil::gettimeofday_us(); - + TrackMe(); - + if (!IsDummyServerRunning() && g_running_server_count.load(butil::memory_order_relaxed) == 0 && fw.check_and_consume() > 0) { @@ -279,14 +281,14 @@ static void GlobalInitializeOrDieImpl() { // may be called before main() only seeing gflags with default // // values even if the gflags will be set after main(). // ////////////////////////////////////////////////////////////////// - - // Ignore SIGPIPE. + + // Ignore SIGPIPE. struct sigaction oldact; - if (sigaction(SIGPIPE, NULL, &oldact) != 0 || + if (sigaction(SIGPIPE, NULL, &oldact) != 0 || (oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) { CHECK(NULL == signal(SIGPIPE, SIG_IGN)); } - + // Make GOOGLE_LOG print to comlog device SetLogHandler(&BaiduStreamingLogHandler); @@ -303,7 +305,7 @@ static void GlobalInitializeOrDieImpl() { // Defined in http_rpc_protocol.cpp InitCommonStrings(); - + // Leave memory of these extensions to process's clean up. g_ext = new(std::nothrow) GlobalExtensions(); if (NULL == g_ext) { @@ -317,6 +319,7 @@ static void GlobalInitializeOrDieImpl() { NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns); NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns); NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns); + NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns); // Load Balancers LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb); @@ -355,7 +358,7 @@ static void GlobalInitializeOrDieImpl() { } Protocol streaming_protocol = { ParseStreamingMessage, - NULL, NULL, ProcessStreamingMessage, + NULL, NULL, ProcessStreamingMessage, ProcessStreamingMessage, NULL, NULL, NULL, CONNECTION_TYPE_SINGLE, "streaming_rpc" }; @@ -363,7 +366,7 @@ static void GlobalInitializeOrDieImpl() { if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) { exit(1); } - + Protocol http_protocol = { ParseHttpMessage, SerializeHttpRequest, PackHttpRequest, ProcessHttpRequest, ProcessHttpResponse, @@ -374,7 +377,7 @@ static void GlobalInitializeOrDieImpl() { if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) { exit(1); } - + Protocol hulu_protocol = { ParseHuluMessage, SerializeRequestDefault, PackHuluRequest, ProcessHuluRequest, ProcessHuluResponse, @@ -417,7 +420,7 @@ static void GlobalInitializeOrDieImpl() { exit(1); } - // Only valid at server side. We generalize all the protocols that + // Only valid at server side. We generalize all the protocols that // prefixes with nshead as `nshead_protocol' and specify the content // parsing after nshead by ServerOptions.nshead_service. Protocol nshead_protocol = { ParseNsheadMessage, @@ -488,7 +491,7 @@ static void GlobalInitializeOrDieImpl() { if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) { exit(1); } - + Protocol rtmp_protocol = { ParseRtmpMessage, SerializeRtmpRequest, PackRtmpRequest, @@ -536,7 +539,7 @@ static void GlobalInitializeOrDieImpl() { InitUserCodeBackupPoolOnceOrDie(); } - // We never join GlobalUpdate, let it quit with the process. + // We never join GlobalUpdate, let it quit with the process. bthread_t th; CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0) << "Fail to start GlobalUpdate"; diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp new file mode 100644 index 0000000000000000000000000000000000000000..853de11fe1958bfa72f42ec382c9259434ff9bd8 --- /dev/null +++ b/src/brpc/policy/consul_naming_service.cpp @@ -0,0 +1,218 @@ +// Copyright (c) 2014 Baidu, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Yaofu Zhang (zhangyaofu@qiyi.com) + +#include <gflags/gflags.h> +#include <string> // std::string +#include <set> // std::set +#include "butil/third_party/rapidjson/document.h" +#include "butil/time/time.h" +#include "bthread/bthread.h" +#include "brpc/log.h" +#include "brpc/channel.h" +#include "brpc/policy/file_naming_service.h" +#include "brpc/policy/consul_naming_service.h" + + +namespace brpc { +namespace policy { + +DEFINE_string(consul_agent_addr, "http://127.0.0.1:8500", + "The query string of request consul for discovering service."); +DEFINE_string(consul_service_discovery_url, + "/v1/health/service/", + "The url of consul for discovering service."); +DEFINE_string(consul_url_parameter, "?stale&passing", + "The query string of request consul for discovering service."); +DEFINE_int32(consul_connect_timeout_ms, 200, + "Timeout for creating connections to consul in milliseconds"); +DEFINE_int32(consul_blocking_query_wait_secs, 600, + "Maximum duration for the blocking request in secs."); +DEFINE_bool(consul_enable_degrade_to_file_naming_service, false, + "Use local backup file when consul cannot connect"); +DEFINE_string(consul_file_naming_service_dir, "", + "When it degraded to file naming service, the file with name of the " + "service name will be searched in this dir to use."); +DEFINE_int32(consul_retry_interval_ms, 5, + "Wait so many milliseconds before retry when error happens"); + +constexpr char kConsulIndex[] = "X-Consul-Index"; + +int ConsulNamingService::DegradeToFilenamingServiceIfNeed(const char* service_name, + std::vector<ServerNode>* servers) { + if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) { + _backup_file_loaded = true; + const std::string file(FLAGS_consul_file_naming_service_dir + service_name); + LOG(INFO) << "Load server list from " << file; + FileNamingService fns; + return fns.GetServers(file.c_str(), servers); + } + return -1; +} + +int ConsulNamingService::GetServers(const char* service_name, + std::vector<ServerNode>* servers) { + if (!_consul_connected) { + ChannelOptions opt; + opt.protocol = PROTOCOL_HTTP; + opt.connect_timeout_ms = FLAGS_consul_connect_timeout_ms; + opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond; + if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) { + LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr; + return DegradeToFilenamingServiceIfNeed(service_name, servers); + } + _consul_connected = true; + } + + if (_consul_url.empty()) { + _consul_url.append(FLAGS_consul_service_discovery_url); + _consul_url.append(service_name); + _consul_url.append(FLAGS_consul_url_parameter); + } + + servers->clear(); + std::string consul_url(_consul_url); + if (!_consul_index.empty()) { + consul_url.append("&index="); + consul_url.append(_consul_index); + consul_url.append("&wait="); + consul_url.append(std::to_string(FLAGS_consul_blocking_query_wait_secs)); + consul_url.push_back('s'); + } + + Controller cntl; + cntl.http_request().uri() = consul_url; + _channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); + if (cntl.Failed()) { + LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr; + return DegradeToFilenamingServiceIfNeed(service_name, servers); + } + + const std::string* index = cntl.http_response().GetHeader(kConsulIndex); + if (index != nullptr) { + if (*index == _consul_index) { + LOG_EVERY_N(ERROR, 100) << "There is no service changed for the list of " + << service_name + << ", consul_index: " << _consul_index; + return -1; + } + } else { + LOG(ERROR) << "Failed to parse consul index of " << service_name << "."; + return -1; + } + + // Sort/unique the inserted vector is faster, but may have a different order + // of addresses from the file. To make assertions in tests easier, we use + // set to de-duplicate and keep the order. + std::set<ServerNode> presence; + + rapidjson::Document services; + services.Parse(cntl.response_attachment().to_string().c_str()); + if (!services.IsArray()) { + return -1; + } + + for (rapidjson::SizeType i = 0; i < services.Size(); ++i) { + if (!services[i].HasMember("Service")) { + continue; + } + if (!services[i]["Service"].HasMember("Address") || + !services[i]["Service"]["Address"].IsString() || + !services[i]["Service"].HasMember("Port") || + !services[i]["Service"]["Port"].IsUint()) { + continue; + } + butil::EndPoint end_point; + if (str2endpoint(services[i]["Service"]["Address"].GetString(), + services[i]["Service"]["Port"].GetUint(), + &end_point) != 0) { + LOG(ERROR) << "Invalid address=`" << services[i]["Service"]["Address"].GetString() << '\'' + << " , port= " << services[i]["Service"]["Port"].GetUint(); + continue; + } + ServerNode node; + node.addr = end_point; + // Tags in consul is an array, here we just use the first one. + if (services[i]["Service"].HasMember("Tags") && + services[i]["Service"]["Tags"].IsArray() && + services[i]["Service"]["Tags"].Size() > 0 && + services[i]["Service"]["Tags"][0].IsString()) { + node.tag = services[i]["Service"]["Tags"][0].GetString(); + } + + if (presence.insert(node).second) { + servers->push_back(node); + } else { + RPC_VLOG << "Duplicated server=" << node; + } + } + + _consul_index = *index; + + RPC_VLOG << "Got " << servers->size() + << (servers->size() > 1 ? " servers" : " server") + << " from " << service_name; + return 0; +} + +int ConsulNamingService::RunNamingService(const char* service_name, + NamingServiceActions* actions) { + std::vector<ServerNode> servers; + bool ever_reset = false; + for (;;) { + servers.clear(); + const int rc = GetServers(service_name, &servers); + if (rc == 0) { + ever_reset = true; + actions->ResetServers(servers); + } else { + if (!ever_reset) { + // ResetServers must be called at first time even if GetServers + // failed, to wake up callers to `WaitForFirstBatchOfServers' + ever_reset = true; + servers.clear(); + actions->ResetServers(servers); + } + if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMillisecondsPerSecond) < 0) { + if (errno == ESTOP) { + RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); + return 0; + } + PLOG(FATAL) << "Fail to sleep"; + return -1; + } + } + } + CHECK(false); + return -1; +} + + +void ConsulNamingService::Describe(std::ostream& os, + const DescribeOptions&) const { + os << "consul"; + return; +} + +NamingService* ConsulNamingService::New() const { + return new ConsulNamingService; +} + +void ConsulNamingService::Destroy() { + delete this; +} + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h new file mode 100644 index 0000000000000000000000000000000000000000..d163893e3e0230da5ee65008285c7db65f857009 --- /dev/null +++ b/src/brpc/policy/consul_naming_service.h @@ -0,0 +1,57 @@ +// Copyright (c) 2014 Baidu, Inc.G +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Authors: Yaofu Zhang (zhangyaofu@qiyi.com) + +#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE +#define BRPC_POLICY_CONSUL_NAMING_SERVICE + +#include "brpc/naming_service.h" +#include "brpc/channel.h" + + +namespace brpc { +class Channel; +namespace policy { + +class ConsulNamingService : public NamingService { +private: + int RunNamingService(const char* service_name, + NamingServiceActions* actions); + + int GetServers(const char* service_name, + std::vector<ServerNode>* servers); + + void Describe(std::ostream& os, const DescribeOptions&) const; + + NamingService* New() const; + + int DegradeToFilenamingServiceIfNeed(const char* service_name, + std::vector<ServerNode>* servers); + + void Destroy(); + +private: + Channel _channel; + std::string _consul_index; + std::string _consul_url; + bool _backup_file_loaded = false; + bool _consul_connected = false; +}; + +} // namespace policy +} // namespace brpc + + +#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE diff --git a/src/brpc/policy/file_naming_service.h b/src/brpc/policy/file_naming_service.h index 9df29adcf68d907d2bd9c2b28fa4c91110e2df3c..20616b6b37043305e070e5f2f43a02c5d4734445 100644 --- a/src/brpc/policy/file_naming_service.h +++ b/src/brpc/policy/file_naming_service.h @@ -1,11 +1,11 @@ // Copyright (c) 2014 Baidu, Inc. -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,17 +24,18 @@ namespace brpc { namespace policy { class FileNamingService : public NamingService { +friend class ConsulNamingService; private: int RunNamingService(const char* service_name, NamingServiceActions* actions); - + int GetServers(const char *service_name, std::vector<ServerNode>* servers); void Describe(std::ostream& os, const DescribeOptions&) const; NamingService* New() const; - + void Destroy(); }; diff --git a/test/brpc_naming_service_unittest.cpp b/test/brpc_naming_service_unittest.cpp index a26c3ff4e013a659227d0269bbd294056eecb7d5..2de33f3a813d2d9cf55f4085fbfb27bf304f08f1 100644 --- a/test/brpc_naming_service_unittest.cpp +++ b/test/brpc_naming_service_unittest.cpp @@ -6,9 +6,11 @@ #include <vector> #include "butil/string_printf.h" #include "butil/files/temp_file.h" +#include "bthread/bthread.h" #ifdef BAIDU_INTERNAL #include "brpc/policy/baidu_naming_service.h" #endif +#include "brpc/policy/consul_naming_service.h" #include "brpc/policy/domain_naming_service.h" #include "brpc/policy/file_naming_service.h" #include "brpc/policy/list_naming_service.h" @@ -16,6 +18,17 @@ #include "echo.pb.h" #include "brpc/server.h" + +namespace brpc { +namespace policy { + +DECLARE_bool(consul_enable_degrade_to_file_naming_service); +DECLARE_string(consul_file_naming_service_dir); +DECLARE_string(consul_service_discovery_url); + +} // policy +} // brpc + namespace { TEST(NamingServiceTest, sanity) { std::vector<brpc::ServerNode> servers; @@ -69,7 +82,7 @@ TEST(NamingServiceTest, sanity) { oss << servers[i]; ASSERT_EQ(address_list[i], oss.str()) << "i=" << i; } - + std::string s; for (size_t i = 0; i < ARRAY_SIZE(address_list); ++i) { ASSERT_EQ(0, butil::string_appendf(&s, "%s,", address_list[i])); @@ -101,7 +114,7 @@ TEST(NamingServiceTest, invalid_port) { TEST(NamingServiceTest, wrong_name) { std::vector<brpc::ServerNode> servers; -#ifdef BAIDU_INTERNAL +#ifdef BAIDU_INTERNAL brpc::policy::BaiduNamingService bns; ASSERT_EQ(-1, bns.GetServers("Wrong", &servers)); #endif @@ -160,7 +173,7 @@ public: brpc::ClosureGuard done_guard(done); touch_count.fetch_add(1); } - + butil::atomic<int64_t> list_names_count; butil::atomic<int64_t> touch_count; }; @@ -202,4 +215,203 @@ TEST(NamingServiceTest, remotefile) { ASSERT_EQ(expected_servers[i], servers[i]); } } + +class ConsulNamingServiceImpl : public test::UserNamingService { +public: + ConsulNamingServiceImpl() : list_names_count(0), touch_count(0) { + } + ~ConsulNamingServiceImpl() { } + void ListNames(google::protobuf::RpcController* cntl_base, + const test::HttpRequest*, + test::HttpResponse*, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = (brpc::Controller*)cntl_base; + cntl->http_response().SetHeader("X-Consul-Index", "1"); + cntl->response_attachment().append( + R"([ + { + "Node": { + "ID": "44454c4c-4e00-1050-8052-b7c04f4b5931", + "Node": "sh-qs-10.121.36.189", + "Address": "10.121.36.189", + "Datacenter": "shjj", + "TaggedAddresses": { + "lan": "10.121.36.189", + "wan": "10.121.36.189" + }, + "Meta": { + "consul-network-segment": "" + }, + "CreateIndex": 4820296, + "ModifyIndex": 4823818 + }, + "Service": { + "ID": "10.121.36.189_8003_qs_show_leaf", + "Service": "qs_show_leaf", + "Tags": ["1"], + "Address": "10.121.36.189", + "Port": 8003, + "EnableTagOverride": false, + "CreateIndex": 6515285, + "ModifyIndex": 6515285 + }, + "Checks": [ + { + "Node": "sh-qs-10.121.36.189", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "Agent alive and reachable", + "ServiceID": "", + "ServiceName": "", + "ServiceTags": [ ], + "CreateIndex": 4820296, + "ModifyIndex": 4820296 + }, + { + "Node": "sh-qs-10.121.36.189", + "CheckID": "service:10.121.36.189_8003_qs_show_leaf", + "Name": "Service 'qs_show_leaf' check", + "Status": "passing", + "Notes": "", + "Output": "TCP connect 10.121.36.189:8003: Success", + "ServiceID": "10.121.36.189_8003_qs_show_leaf", + "ServiceName": "qs_show_leaf", + "ServiceTags": [ ], + "CreateIndex": 6515285, + "ModifyIndex": 6702198 + } + ] + }, + { + "Node": { + "ID": "44454c4c-4b00-1050-8052-b6c04f4b5931", + "Node": "sh-qs-10.121.36.190", + "Address": "10.121.36.190", + "Datacenter": "shjj", + "TaggedAddresses": { + "lan": "10.121.36.190", + "wan": "10.121.36.190" + }, + "Meta": { + "consul-network-segment": "" + }, + "CreateIndex": 4820296, + "ModifyIndex": 4823751 + }, + "Service": { + "ID": "10.121.36.190_8003_qs_show_leaf", + "Service": "qs_show_leaf", + "Tags": ["2"], + "Address": "10.121.36.190", + "Port": 8003, + "EnableTagOverride": false, + "CreateIndex": 6515635, + "ModifyIndex": 6515635 + }, + "Checks": [ + { + "Node": "sh-qs-10.121.36.190", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "Agent alive and reachable", + "ServiceID": "", + "ServiceName": "", + "ServiceTags": [ ], + "CreateIndex": 4820296, + "ModifyIndex": 4820296 + }, + { + "Node": "sh-qs-10.121.36.190", + "CheckID": "service:10.121.36.190_8003_qs_show_leaf", + "Name": "Service 'qs_show_leaf' check", + "Status": "passing", + "Notes": "", + "Output": "TCP connect 10.121.36.190:8003: Success", + "ServiceID": "10.121.36.190_8003_qs_show_leaf", + "ServiceName": "qs_show_leaf", + "ServiceTags": [ ], + "CreateIndex": 6515635, + "ModifyIndex": 6705515 + } + ] + } + ])"); + list_names_count.fetch_add(1); + } + void Touch(google::protobuf::RpcController*, + const test::HttpRequest*, + test::HttpResponse*, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + touch_count.fetch_add(1); + } + + butil::atomic<int64_t> list_names_count; + butil::atomic<int64_t> touch_count; +}; + +TEST(NamingServiceTest, consul_with_backup_file) { + brpc::policy::FLAGS_consul_enable_degrade_to_file_naming_service = true; + const char *address_list[] = { + "10.127.0.1:1234", + "10.128.0.1:1234", + "10.129.0.1:1234", + }; + butil::TempFile tmp_file; + const char * service_name = tmp_file.fname(); + { + FILE* fp = fopen(tmp_file.fname(), "w"); + for (size_t i = 0; i < ARRAY_SIZE(address_list); ++i) { + ASSERT_TRUE(fprintf(fp, "%s\n", address_list[i])); + } + fclose(fp); + } + std::cout << tmp_file.fname() << std::endl; + + std::vector<brpc::ServerNode> servers; + brpc::policy::ConsulNamingService cns; + ASSERT_EQ(0, cns.GetServers(service_name, &servers)); + ASSERT_EQ(ARRAY_SIZE(address_list), servers.size()); + for (size_t i = 0; i < ARRAY_SIZE(address_list); ++i) { + std::ostringstream oss; + oss << servers[i]; + ASSERT_EQ(address_list[i], oss.str()) << "i=" << i; + } + + brpc::Server server; + ConsulNamingServiceImpl svc; + std::string restful_map(brpc::policy::FLAGS_consul_service_discovery_url); + restful_map.append("/"); + restful_map.append(service_name); + restful_map.append(" => ListNames"); + ASSERT_EQ(0, server.AddService(&svc, + brpc::SERVER_DOESNT_OWN_SERVICE, + restful_map.c_str())); + ASSERT_EQ(0, server.Start("localhost:8500", NULL)); + + bthread_usleep(100000); + + butil::EndPoint n1; + ASSERT_EQ(0, butil::str2endpoint("10.121.36.189:8003", &n1)); + butil::EndPoint n2; + ASSERT_EQ(0, butil::str2endpoint("10.121.36.190:8003", &n2)); + std::vector<brpc::ServerNode> expected_servers; + expected_servers.push_back(brpc::ServerNode(n1, "1")); + expected_servers.push_back(brpc::ServerNode(n2, "2")); + std::sort(expected_servers.begin(), expected_servers.end()); + + servers.clear(); + ASSERT_EQ(0, cns.GetServers(service_name, &servers)); + ASSERT_EQ(expected_servers.size(), servers.size()); + std::sort(servers.begin(), servers.end()); + for (size_t i = 0; i < expected_servers.size(); ++i) { + ASSERT_EQ(expected_servers[i], servers[i]); + } +} + } //namespace