Unverified Commit b1fb5772 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #274 from ZhangYaoFu/consul_naming_service

add consul naming service
parents 1daddbe7 50d26f74
// Copyright (c) 2014 Baidu, Inc. // Copyright (c) 2014 Baidu, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
// //
// http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "brpc/policy/list_naming_service.h" #include "brpc/policy/list_naming_service.h"
#include "brpc/policy/domain_naming_service.h" #include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h" #include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
// Load Balancers // Load Balancers
#include "brpc/policy/round_robin_load_balancer.h" #include "brpc/policy/round_robin_load_balancer.h"
...@@ -105,6 +106,7 @@ struct GlobalExtensions { ...@@ -105,6 +106,7 @@ struct GlobalExtensions {
ListNamingService lns; ListNamingService lns;
DomainNamingService dns; DomainNamingService dns;
RemoteFileNamingService rfns; RemoteFileNamingService rfns;
ConsulNamingService cns;
RoundRobinLoadBalancer rr_lb; RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb; WeightedRoundRobinLoadBalancer wrr_lb;
...@@ -163,7 +165,7 @@ extern butil::static_atomic<int> g_running_server_count; ...@@ -163,7 +165,7 @@ extern butil::static_atomic<int> g_running_server_count;
static int GetRunningServerCount(void*) { static int GetRunningServerCount(void*) {
return g_running_server_count.load(butil::memory_order_relaxed); return g_running_server_count.load(butil::memory_order_relaxed);
} }
// Update global stuff periodically. // Update global stuff periodically.
static void* GlobalUpdate(void*) { static void* GlobalUpdate(void*) {
// Expose variables. // Expose variables.
...@@ -180,7 +182,7 @@ static void* GlobalUpdate(void*) { ...@@ -180,7 +182,7 @@ static void* GlobalUpdate(void*) {
"iobuf_block_memory", GetIOBufBlockMemory, NULL); "iobuf_block_memory", GetIOBufBlockMemory, NULL);
bvar::PassiveStatus<int> var_running_server_count( bvar::PassiveStatus<int> var_running_server_count(
"rpc_server_count", GetRunningServerCount, NULL); "rpc_server_count", GetRunningServerCount, NULL);
butil::FileWatcher fw; butil::FileWatcher fw;
if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) { if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) {
LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'"; LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'";
...@@ -201,16 +203,16 @@ static void* GlobalUpdate(void*) { ...@@ -201,16 +203,16 @@ static void* GlobalUpdate(void*) {
break; break;
} }
consecutive_nosleep = 0; consecutive_nosleep = 0;
} else { } else {
if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) { if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
consecutive_nosleep = 0; consecutive_nosleep = 0;
LOG(WARNING) << __FUNCTION__ << " is too busy!"; LOG(WARNING) << __FUNCTION__ << " is too busy!";
} }
} }
last_time_us = butil::gettimeofday_us(); last_time_us = butil::gettimeofday_us();
TrackMe(); TrackMe();
if (!IsDummyServerRunning() if (!IsDummyServerRunning()
&& g_running_server_count.load(butil::memory_order_relaxed) == 0 && g_running_server_count.load(butil::memory_order_relaxed) == 0
&& fw.check_and_consume() > 0) { && fw.check_and_consume() > 0) {
...@@ -279,14 +281,14 @@ static void GlobalInitializeOrDieImpl() { ...@@ -279,14 +281,14 @@ static void GlobalInitializeOrDieImpl() {
// may be called before main() only seeing gflags with default // // may be called before main() only seeing gflags with default //
// values even if the gflags will be set after main(). // // values even if the gflags will be set after main(). //
////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////
// Ignore SIGPIPE. // Ignore SIGPIPE.
struct sigaction oldact; struct sigaction oldact;
if (sigaction(SIGPIPE, NULL, &oldact) != 0 || if (sigaction(SIGPIPE, NULL, &oldact) != 0 ||
(oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) { (oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) {
CHECK(NULL == signal(SIGPIPE, SIG_IGN)); CHECK(NULL == signal(SIGPIPE, SIG_IGN));
} }
// Make GOOGLE_LOG print to comlog device // Make GOOGLE_LOG print to comlog device
SetLogHandler(&BaiduStreamingLogHandler); SetLogHandler(&BaiduStreamingLogHandler);
...@@ -303,7 +305,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -303,7 +305,7 @@ static void GlobalInitializeOrDieImpl() {
// Defined in http_rpc_protocol.cpp // Defined in http_rpc_protocol.cpp
InitCommonStrings(); InitCommonStrings();
// Leave memory of these extensions to process's clean up. // Leave memory of these extensions to process's clean up.
g_ext = new(std::nothrow) GlobalExtensions(); g_ext = new(std::nothrow) GlobalExtensions();
if (NULL == g_ext) { if (NULL == g_ext) {
...@@ -317,6 +319,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -317,6 +319,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns); NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns);
NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns); NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns); NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
// Load Balancers // Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb); LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
...@@ -355,7 +358,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -355,7 +358,7 @@ static void GlobalInitializeOrDieImpl() {
} }
Protocol streaming_protocol = { ParseStreamingMessage, Protocol streaming_protocol = { ParseStreamingMessage,
NULL, NULL, ProcessStreamingMessage, NULL, NULL, ProcessStreamingMessage,
ProcessStreamingMessage, ProcessStreamingMessage,
NULL, NULL, NULL, NULL, NULL, NULL,
CONNECTION_TYPE_SINGLE, "streaming_rpc" }; CONNECTION_TYPE_SINGLE, "streaming_rpc" };
...@@ -363,7 +366,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -363,7 +366,7 @@ static void GlobalInitializeOrDieImpl() {
if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) { if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) {
exit(1); exit(1);
} }
Protocol http_protocol = { ParseHttpMessage, Protocol http_protocol = { ParseHttpMessage,
SerializeHttpRequest, PackHttpRequest, SerializeHttpRequest, PackHttpRequest,
ProcessHttpRequest, ProcessHttpResponse, ProcessHttpRequest, ProcessHttpResponse,
...@@ -374,7 +377,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -374,7 +377,7 @@ static void GlobalInitializeOrDieImpl() {
if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) { if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) {
exit(1); exit(1);
} }
Protocol hulu_protocol = { ParseHuluMessage, Protocol hulu_protocol = { ParseHuluMessage,
SerializeRequestDefault, PackHuluRequest, SerializeRequestDefault, PackHuluRequest,
ProcessHuluRequest, ProcessHuluResponse, ProcessHuluRequest, ProcessHuluResponse,
...@@ -417,7 +420,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -417,7 +420,7 @@ static void GlobalInitializeOrDieImpl() {
exit(1); exit(1);
} }
// Only valid at server side. We generalize all the protocols that // Only valid at server side. We generalize all the protocols that
// prefixes with nshead as `nshead_protocol' and specify the content // prefixes with nshead as `nshead_protocol' and specify the content
// parsing after nshead by ServerOptions.nshead_service. // parsing after nshead by ServerOptions.nshead_service.
Protocol nshead_protocol = { ParseNsheadMessage, Protocol nshead_protocol = { ParseNsheadMessage,
...@@ -488,7 +491,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -488,7 +491,7 @@ static void GlobalInitializeOrDieImpl() {
if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) { if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) {
exit(1); exit(1);
} }
Protocol rtmp_protocol = { Protocol rtmp_protocol = {
ParseRtmpMessage, ParseRtmpMessage,
SerializeRtmpRequest, PackRtmpRequest, SerializeRtmpRequest, PackRtmpRequest,
...@@ -536,7 +539,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -536,7 +539,7 @@ static void GlobalInitializeOrDieImpl() {
InitUserCodeBackupPoolOnceOrDie(); InitUserCodeBackupPoolOnceOrDie();
} }
// We never join GlobalUpdate, let it quit with the process. // We never join GlobalUpdate, let it quit with the process.
bthread_t th; bthread_t th;
CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0) CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0)
<< "Fail to start GlobalUpdate"; << "Fail to start GlobalUpdate";
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Yaofu Zhang (zhangyaofu@qiyi.com)
#include <gflags/gflags.h>
#include <string> // std::string
#include <set> // std::set
#include "butil/string_printf.h"
#include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/time/time.h"
#include "bthread/bthread.h"
#include "brpc/log.h"
#include "brpc/channel.h"
#include "brpc/policy/file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
namespace brpc {
namespace policy {
DEFINE_string(consul_agent_addr, "http://127.0.0.1:8500",
"The query string of request consul for discovering service.");
DEFINE_string(consul_service_discovery_url,
"/v1/health/service/",
"The url of consul for discovering service.");
DEFINE_string(consul_url_parameter, "?stale&passing",
"The query string of request consul for discovering service.");
DEFINE_int32(consul_connect_timeout_ms, 200,
"Timeout for creating connections to consul in milliseconds");
DEFINE_int32(consul_blocking_query_wait_secs, 60,
"Maximum duration for the blocking request in secs.");
DEFINE_bool(consul_enable_degrade_to_file_naming_service, false,
"Use local backup file when consul cannot connect");
DEFINE_string(consul_file_naming_service_dir, "",
"When it degraded to file naming service, the file with name of the "
"service name will be searched in this dir to use.");
DEFINE_int32(consul_retry_interval_ms, 500,
"Wait so many milliseconds before retry when error happens");
constexpr char kConsulIndex[] = "X-Consul-Index";
std::string RapidjsonValueToString(const rapidjson::Value& value) {
rapidjson::StringBuffer buffer;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
value.Accept(writer);
return buffer.GetString();
}
int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
_backup_file_loaded = true;
const std::string file(FLAGS_consul_file_naming_service_dir + service_name);
LOG(INFO) << "Load server list from " << file;
FileNamingService fns;
return fns.GetServers(file.c_str(), servers);
}
return -1;
}
int ConsulNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
if (!_consul_connected) {
ChannelOptions opt;
opt.protocol = PROTOCOL_HTTP;
opt.connect_timeout_ms = FLAGS_consul_connect_timeout_ms;
opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond;
if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) {
LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr;
return DegradeToOtherServiceIfNeeded(service_name, servers);
}
_consul_connected = true;
}
if (_consul_url.empty()) {
_consul_url.append(FLAGS_consul_service_discovery_url);
_consul_url.append(service_name);
_consul_url.append(FLAGS_consul_url_parameter);
}
servers->clear();
std::string consul_url(_consul_url);
if (!_consul_index.empty()) {
butil::string_appendf(&consul_url, "&index=%s&wait=%ds", _consul_index.c_str(),
FLAGS_consul_blocking_query_wait_secs);
}
Controller cntl;
cntl.http_request().uri() = consul_url;
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access " << consul_url << ": "
<< cntl.ErrorText();
return DegradeToOtherServiceIfNeeded(service_name, servers);
}
const std::string* index = cntl.http_response().GetHeader(kConsulIndex);
if (index != nullptr) {
if (*index == _consul_index) {
LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
<< service_name
<< ", consul_index: " << _consul_index;
return -1;
}
} else {
LOG(ERROR) << "Failed to parse consul index of " << service_name << ".";
return -1;
}
// Sort/unique the inserted vector is faster, but may have a different order
// of addresses from the file. To make assertions in tests easier, we use
// set to de-duplicate and keep the order.
std::set<ServerNode> presence;
rapidjson::Document services;
services.Parse(cntl.response_attachment().to_string().c_str());
if (!services.IsArray()) {
LOG(ERROR) << "The consul's response for "
<< service_name << " is not a json array";
return -1;
}
for (rapidjson::SizeType i = 0; i < services.Size(); ++i) {
if (!services[i].HasMember("Service")) {
LOG(ERROR) << "No service info in node: "
<< RapidjsonValueToString(services[i]);
continue;
}
const rapidjson::Value& service = services[i]["Service"];
if (!service.HasMember("Address") ||
!service["Address"].IsString() ||
!service.HasMember("Port") ||
!service["Port"].IsUint()) {
LOG(ERROR) << "Service with no valid address or port: "
<< RapidjsonValueToString(service);
continue;
}
butil::EndPoint end_point;
if (str2endpoint(service["Address"].GetString(),
service["Port"].GetUint(),
&end_point) != 0) {
LOG(ERROR) << "Service with illegal address or port: "
<< RapidjsonValueToString(service);
continue;
}
ServerNode node;
node.addr = end_point;
if (service.HasMember("Tags")) {
if (service["Tags"].IsArray()) {
if (service["Tags"].Size() > 0) {
// Tags in consul is an array, here we only use the first one.
if (service["Tags"][0].IsString()) {
node.tag = service["Tags"][0].GetString();
} else {
LOG(ERROR) << "First tag returned by consul is not string, service: "
<< RapidjsonValueToString(service);
continue;
}
}
} else {
LOG(ERROR) << "Service tags returned by consul is not json array, service: "
<< RapidjsonValueToString(service);
continue;
}
}
if (presence.insert(node).second) {
servers->push_back(node);
} else {
RPC_VLOG << "Duplicated server=" << node;
}
}
_consul_index = *index;
if (servers->empty() && !services.Empty()) {
LOG(ERROR) << "All service about " << service_name
<< " from consul is invalid, refuse to update servers";
return -1;
}
RPC_VLOG << "Got " << servers->size()
<< (servers->size() > 1 ? " servers" : " server")
<< " from " << service_name;
return 0;
}
int ConsulNamingService::RunNamingService(const char* service_name,
NamingServiceActions* actions) {
std::vector<ServerNode> servers;
bool ever_reset = false;
for (;;) {
servers.clear();
const int rc = GetServers(service_name, &servers);
if (rc == 0) {
ever_reset = true;
actions->ResetServers(servers);
} else {
if (!ever_reset) {
// ResetServers must be called at first time even if GetServers
// failed, to wake up callers to `WaitForFirstBatchOfServers'
ever_reset = true;
servers.clear();
actions->ResetServers(servers);
}
if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMillisecondsPerSecond) < 0) {
if (errno == ESTOP) {
RPC_VLOG << "Quit NamingServiceThread=" << bthread_self();
return 0;
}
PLOG(FATAL) << "Fail to sleep";
return -1;
}
}
}
CHECK(false);
return -1;
}
void ConsulNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "consul";
return;
}
NamingService* ConsulNamingService::New() const {
return new ConsulNamingService;
}
void ConsulNamingService::Destroy() {
delete this;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Yaofu Zhang (zhangyaofu@qiyi.com)
#ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE
#define BRPC_POLICY_CONSUL_NAMING_SERVICE
#include "brpc/naming_service.h"
#include "brpc/channel.h"
namespace brpc {
class Channel;
namespace policy {
class ConsulNamingService : public NamingService {
private:
int RunNamingService(const char* service_name,
NamingServiceActions* actions);
int GetServers(const char* service_name,
std::vector<ServerNode>* servers);
void Describe(std::ostream& os, const DescribeOptions&) const;
NamingService* New() const;
int DegradeToOtherServiceIfNeeded(const char* service_name,
std::vector<ServerNode>* servers);
void Destroy();
private:
Channel _channel;
std::string _consul_index;
std::string _consul_url;
bool _backup_file_loaded = false;
bool _consul_connected = false;
};
} // namespace policy
} // namespace brpc
#endif //BRPC_POLICY_CONSUL_NAMING_SERVICE
// Copyright (c) 2014 Baidu, Inc. // Copyright (c) 2014 Baidu, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
// //
// http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -24,17 +24,18 @@ namespace brpc { ...@@ -24,17 +24,18 @@ namespace brpc {
namespace policy { namespace policy {
class FileNamingService : public NamingService { class FileNamingService : public NamingService {
friend class ConsulNamingService;
private: private:
int RunNamingService(const char* service_name, int RunNamingService(const char* service_name,
NamingServiceActions* actions); NamingServiceActions* actions);
int GetServers(const char *service_name, int GetServers(const char *service_name,
std::vector<ServerNode>* servers); std::vector<ServerNode>* servers);
void Describe(std::ostream& os, const DescribeOptions&) const; void Describe(std::ostream& os, const DescribeOptions&) const;
NamingService* New() const; NamingService* New() const;
void Destroy(); void Destroy();
}; };
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment