Commit ec741437 authored by zhangyaofu's avatar zhangyaofu

review comments fix

parent d24c9b69
...@@ -17,7 +17,10 @@ ...@@ -17,7 +17,10 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <string> // std::string #include <string> // std::string
#include <set> // std::set #include <set> // std::set
#include "butil/string_printf.h"
#include "butil/third_party/rapidjson/document.h" #include "butil/third_party/rapidjson/document.h"
#include "butil/third_party/rapidjson/stringbuffer.h"
#include "butil/third_party/rapidjson/prettywriter.h"
#include "butil/time/time.h" #include "butil/time/time.h"
#include "bthread/bthread.h" #include "bthread/bthread.h"
#include "brpc/log.h" #include "brpc/log.h"
...@@ -45,13 +48,20 @@ DEFINE_bool(consul_enable_degrade_to_file_naming_service, false, ...@@ -45,13 +48,20 @@ DEFINE_bool(consul_enable_degrade_to_file_naming_service, false,
DEFINE_string(consul_file_naming_service_dir, "", DEFINE_string(consul_file_naming_service_dir, "",
"When it degraded to file naming service, the file with name of the " "When it degraded to file naming service, the file with name of the "
"service name will be searched in this dir to use."); "service name will be searched in this dir to use.");
DEFINE_int32(consul_retry_interval_ms, 5, DEFINE_int32(consul_retry_interval_ms, 50,
"Wait so many milliseconds before retry when error happens"); "Wait so many milliseconds before retry when error happens");
constexpr char kConsulIndex[] = "X-Consul-Index"; constexpr char kConsulIndex[] = "X-Consul-Index";
int ConsulNamingService::DegradeToFilenamingServiceIfNeed(const char* service_name, std::string RapidjsonValueToString(const rapidjson::Value& value) {
std::vector<ServerNode>* servers) { rapidjson::StringBuffer buffer;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
value.Accept(writer);
return buffer.GetString();
}
int ConsulNamingService::DegradeToOtherServiceIfNeed(const char* service_name,
std::vector<ServerNode>* servers) {
if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) { if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) {
_backup_file_loaded = true; _backup_file_loaded = true;
const std::string file(FLAGS_consul_file_naming_service_dir + service_name); const std::string file(FLAGS_consul_file_naming_service_dir + service_name);
...@@ -71,7 +81,7 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -71,7 +81,7 @@ int ConsulNamingService::GetServers(const char* service_name,
opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond; opt.timeout_ms = (FLAGS_consul_blocking_query_wait_secs + 10) * butil::Time::kMillisecondsPerSecond;
if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) { if (_channel.Init(FLAGS_consul_agent_addr.c_str(), "rr", &opt) != 0) {
LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr; LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr;
return DegradeToFilenamingServiceIfNeed(service_name, servers); return DegradeToOtherServiceIfNeed(service_name, servers);
} }
_consul_connected = true; _consul_connected = true;
} }
...@@ -85,25 +95,23 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -85,25 +95,23 @@ int ConsulNamingService::GetServers(const char* service_name,
servers->clear(); servers->clear();
std::string consul_url(_consul_url); std::string consul_url(_consul_url);
if (!_consul_index.empty()) { if (!_consul_index.empty()) {
consul_url.append("&index="); butil::string_appendf(&consul_url, "&index=%s&wait=%ds", _consul_index.c_str(),
consul_url.append(_consul_index); FLAGS_consul_blocking_query_wait_secs);
consul_url.append("&wait=");
consul_url.append(std::to_string(FLAGS_consul_blocking_query_wait_secs));
consul_url.push_back('s');
} }
Controller cntl; Controller cntl;
cntl.http_request().uri() = consul_url; cntl.http_request().uri() = consul_url;
_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); _channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to init channel to consul at " << FLAGS_consul_agent_addr; LOG(ERROR) << "Fail to access " << consul_url << ": "
return DegradeToFilenamingServiceIfNeed(service_name, servers); << cntl.ErrorText();
return DegradeToOtherServiceIfNeed(service_name, servers);
} }
const std::string* index = cntl.http_response().GetHeader(kConsulIndex); const std::string* index = cntl.http_response().GetHeader(kConsulIndex);
if (index != nullptr) { if (index != nullptr) {
if (*index == _consul_index) { if (*index == _consul_index) {
LOG_EVERY_N(ERROR, 100) << "There is no service changed for the list of " LOG_EVERY_N(INFO, 100) << "There is no service changed for the list of "
<< service_name << service_name
<< ", consul_index: " << _consul_index; << ", consul_index: " << _consul_index;
return -1; return -1;
...@@ -121,35 +129,45 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -121,35 +129,45 @@ int ConsulNamingService::GetServers(const char* service_name,
rapidjson::Document services; rapidjson::Document services;
services.Parse(cntl.response_attachment().to_string().c_str()); services.Parse(cntl.response_attachment().to_string().c_str());
if (!services.IsArray()) { if (!services.IsArray()) {
LOG(ERROR) << "The consul's response for "
<< service_name << " is not a json array";
return -1; return -1;
} }
for (rapidjson::SizeType i = 0; i < services.Size(); ++i) { for (rapidjson::SizeType i = 0; i < services.Size(); ++i) {
if (!services[i].HasMember("Service")) { if (!services[i].HasMember("Service")) {
LOG(ERROR) << "No service info in node: "
<< RapidjsonValueToString(services[i]);
continue; continue;
} }
if (!services[i]["Service"].HasMember("Address") ||
!services[i]["Service"]["Address"].IsString() || const rapidjson::Value& service = services[i]["Service"];
!services[i]["Service"].HasMember("Port") || if (!service.HasMember("Address") ||
!services[i]["Service"]["Port"].IsUint()) { !service["Address"].IsString() ||
!service.HasMember("Port") ||
!service["Port"].IsUint()) {
LOG(ERROR) << "Invalid service: "
<< RapidjsonValueToString(service);
continue; continue;
} }
butil::EndPoint end_point; butil::EndPoint end_point;
if (str2endpoint(services[i]["Service"]["Address"].GetString(), if (str2endpoint(service["Address"].GetString(),
services[i]["Service"]["Port"].GetUint(), service["Port"].GetUint(),
&end_point) != 0) { &end_point) != 0) {
LOG(ERROR) << "Invalid address=`" << services[i]["Service"]["Address"].GetString() << '\'' LOG(ERROR) << "Service with illegal address or port: "
<< " , port= " << services[i]["Service"]["Port"].GetUint(); << RapidjsonValueToString(service);
continue; continue;
} }
ServerNode node; ServerNode node;
node.addr = end_point; node.addr = end_point;
// Tags in consul is an array, here we just use the first one. // Tags in consul is an array, here we only use the first one.
if (services[i]["Service"].HasMember("Tags") && if (service.HasMember("Tags") &&
services[i]["Service"]["Tags"].IsArray() && service["Tags"].IsArray() &&
services[i]["Service"]["Tags"].Size() > 0 && service["Tags"].Size() > 0 &&
services[i]["Service"]["Tags"][0].IsString()) { service["Tags"][0].IsString()) {
node.tag = services[i]["Service"]["Tags"][0].GetString(); node.tag = service["Tags"][0].GetString();
} }
if (presence.insert(node).second) { if (presence.insert(node).second) {
...@@ -161,6 +179,12 @@ int ConsulNamingService::GetServers(const char* service_name, ...@@ -161,6 +179,12 @@ int ConsulNamingService::GetServers(const char* service_name,
_consul_index = *index; _consul_index = *index;
if (servers->empty() && !services.Empty()) {
LOG(ERROR) << "All service about " << service_name
<< " from consul is invalid, refuse to update servers";
return -1;
}
RPC_VLOG << "Got " << servers->size() RPC_VLOG << "Got " << servers->size()
<< (servers->size() > 1 ? " servers" : " server") << (servers->size() > 1 ? " servers" : " server")
<< " from " << service_name; << " from " << service_name;
......
...@@ -37,8 +37,8 @@ private: ...@@ -37,8 +37,8 @@ private:
NamingService* New() const; NamingService* New() const;
int DegradeToFilenamingServiceIfNeed(const char* service_name, int DegradeToOtherServiceIfNeed(const char* service_name,
std::vector<ServerNode>* servers); std::vector<ServerNode>* servers);
void Destroy(); void Destroy();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment