Commit e784526a authored by jamesge's avatar jamesge

Support multiple discovery nodes

parent 12e73e1d
......@@ -48,12 +48,32 @@ DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyon
static pthread_once_t s_init_discovery_channel_once = PTHREAD_ONCE_INIT;
static Channel* s_discovery_channel = NULL;
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
static int ListDiscoveryNodes(const char* discovery_api_addr, std::string* servers) {
Channel api_channel;
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(discovery_api_addr, "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << discovery_api_addr;
return -1;
}
Controller cntl;
cntl.http_request().uri() = discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(FATAL) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return -1;
}
servers->assign("list://");
const std::string response = cntl.response_attachment().to_string();
BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string();
d.Parse(response.c_str());
if (!d.IsObject()) {
LOG(ERROR) << "Fail to parse " << buf << " as json object";
LOG(ERROR) << "Fail to parse " << response << " as json object";
return -1;
}
auto itr = d.FindMember("data");
......@@ -77,50 +97,39 @@ int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
itr_status->value.GetUint() != 0) {
continue;
}
server_addr->assign(itr_addr->value.GetString(),
itr_addr->value.GetStringLength());
// Currently, we just use the first successful result
break;
servers->push_back(',');
servers->append(itr_addr->value.GetString(),
itr_addr->value.GetStringLength());
}
return 0;
}
static void InitDiscoveryChannel() {
Channel api_channel;
static void NewDiscoveryChannel() {
// NOTE: Newly added discovery server is NOT detected until this server
// is restarted. The reasons for this design is that NS cluster rarely
// changes. Although we could detect new discovery servers by implmenenting
// a NamingService, however which is too heavy for solving such a rare case.
std::string discovery_servers;
if (ListDiscoveryNodes(FLAGS_discovery_api_addr.c_str(), &discovery_servers) != 0) {
LOG(ERROR) << "Fail to get discovery nodes from " << FLAGS_discovery_api_addr;
return;
}
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
if (api_channel.Init(FLAGS_discovery_api_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << FLAGS_discovery_api_addr;
return;
}
Controller cntl;
cntl.http_request().uri() = FLAGS_discovery_api_addr;
api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(FATAL) << "Fail to access " << cntl.http_request().uri()
<< ": " << cntl.ErrorText();
return;
}
std::string discovery_addr;
if (ParseNodesResult(cntl.response_attachment(), &discovery_addr) != 0) {
LOG(FATAL) << "Fail to parse nodes result from discovery api server";
return;
}
s_discovery_channel = new Channel;
if (s_discovery_channel->Init(discovery_addr.c_str(), "", &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << discovery_addr;
if (s_discovery_channel->Init(discovery_servers.c_str(), "rr", &channel_options) != 0) {
LOG(ERROR) << "Fail to init channel to " << discovery_servers;
return;
}
}
Channel* GetDiscoveryChannel() {
pthread_once(&s_init_discovery_channel_once, InitDiscoveryChannel);
inline Channel* GetOrNewDiscoveryChannel() {
pthread_once(&s_init_discovery_channel_once, NewDiscoveryChannel);
return s_discovery_channel;
}
bool DiscoveryRegisterParam::IsValid() const {
return !appid.empty() && !hostname.empty() && !addrs.empty() &&
!env.empty() && !zone.empty() && !version.empty();
......@@ -138,7 +147,7 @@ DiscoveryClient::~DiscoveryClient() {
}
}
int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
static int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
const std::string s = buf.to_string();
BUTIL_RAPIDJSON_NAMESPACE::Document d;
d.Parse(s.c_str());
......@@ -161,6 +170,17 @@ int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
}
int DiscoveryClient::DoRenew() const {
// May create short connections which are OK.
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
Channel chan;
if (chan.Init(_current_discovery_server, &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << _current_discovery_server;
return -1;
}
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/renew";
......@@ -172,7 +192,7 @@ int DiscoveryClient::DoRenew() const {
<< "&region=" << _params.region
<< "&zone=" << _params.zone;
os.move_to(cntl.request_attachment());
GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
chan.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText();
return -1;
......@@ -219,15 +239,15 @@ void* DiscoveryClient::PeriodicRenew(void* arg) {
return NULL;
}
int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
int DiscoveryClient::Register(const DiscoveryRegisterParam& params) {
if (_registered.load(butil::memory_order_relaxed) ||
_registered.exchange(true, butil::memory_order_release)) {
return 0;
}
if (!req.IsValid()) {
if (!params.IsValid()) {
return -1;
}
_params = req;
_params = params;
if (DoRegister() != 0) {
return -1;
......@@ -239,7 +259,12 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
return 0;
}
int DiscoveryClient::DoRegister() const {
int DiscoveryClient::DoRegister() {
Channel* chan = GetOrNewDiscoveryChannel();
if (NULL == chan) {
LOG(ERROR) << "Fail to create discovery channel";
return -1;
}
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/register";
......@@ -255,7 +280,7 @@ int DiscoveryClient::DoRegister() const {
<< "&version=" << _params.version
<< "&metadata=" << _params.metadata;
os.move_to(cntl.request_attachment());
GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
chan->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to register " << _params.appid << ": " << cntl.ErrorText();
return -1;
......@@ -266,10 +291,22 @@ int DiscoveryClient::DoRegister() const {
<< ": " << error_text;
return -1;
}
_current_discovery_server = cntl.remote_side();
return 0;
}
int DiscoveryClient::DoCancel() const {
// May create short connections which are OK.
ChannelOptions channel_options;
channel_options.protocol = PROTOCOL_HTTP;
channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
Channel chan;
if (chan.Init(_current_discovery_server, &channel_options) != 0) {
LOG(FATAL) << "Fail to init channel to " << _current_discovery_server;
return -1;
}
Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/cancel";
......@@ -281,7 +318,7 @@ int DiscoveryClient::DoCancel() const {
<< "&region=" << _params.region
<< "&zone=" << _params.zone;
os.move_to(cntl.request_attachment());
GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
chan.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText();
return -1;
......@@ -305,12 +342,16 @@ int DiscoveryNamingService::GetServers(const char* service_name,
LOG_ONCE(ERROR) << "Invalid parameters";
return -1;
}
Channel* chan = GetOrNewDiscoveryChannel();
if (NULL == chan) {
LOG(ERROR) << "Fail to create discovery channel";
}
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
GetDiscoveryChannel()->CallMethod(NULL, &cntl, NULL, NULL, NULL);
chan->CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText();
return -1;
......
......@@ -44,24 +44,28 @@ struct DiscoveryRegisterParam {
// ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple service instances to register,
// you need multiple DiscoveryClient.
// Note: Unregister is automatically called in dtor.
// Note: Cancel to the server is automatically called in dtor.
class DiscoveryClient {
public:
DiscoveryClient();
~DiscoveryClient();
// Initialize this client.
// Returns 0 on success.
// NOTE: Calling more than once does nothing and returns 0.
int Register(const DiscoveryRegisterParam& req);
private:
static void* PeriodicRenew(void* arg);
int DoCancel() const;
int DoRegister() const;
int DoRegister();
int DoRenew() const;
private:
bthread_t _th;
butil::atomic<bool> _registered;
DiscoveryRegisterParam _params;
butil::EndPoint _current_discovery_server;
};
class DiscoveryNamingService : public PeriodicNamingService {
......
......@@ -46,9 +46,6 @@ DECLARE_string(discovery_api_addr);
DECLARE_string(discovery_env);
DECLARE_int32(discovery_renew_interval_s);
// Defined in discovery_naming_service.cpp
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr);
} // policy
} // brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment