Commit 3b174fe1 authored by zhujiashun's avatar zhujiashun

move discovery getserver logic int client

parent 378d5ad2
...@@ -41,6 +41,28 @@ DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyon ...@@ -41,6 +41,28 @@ DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyon
static Channel s_discovery_channel; static Channel s_discovery_channel;
static pthread_once_t s_init_channel_once = PTHREAD_ONCE_INIT; static pthread_once_t s_init_channel_once = PTHREAD_ONCE_INIT;
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
DiscoveryFetchsParam params{
service_name, FLAGS_discovery_env, FLAGS_discovery_status};
return _client.Fetchs(params, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) { int ParseNodesResult(const butil::IOBuf& buf, std::string* server_addr) {
BUTIL_RAPIDJSON_NAMESPACE::Document d; BUTIL_RAPIDJSON_NAMESPACE::Document d;
const std::string response = buf.to_string(); const std::string response = buf.to_string();
...@@ -178,36 +200,6 @@ int ParseFetchsResult(const butil::IOBuf& buf, ...@@ -178,36 +200,6 @@ int ParseFetchsResult(const butil::IOBuf& buf,
return 0; return 0;
} }
int DiscoveryNamingService::GetServers(const char* service_name,
std::vector<ServerNode>* servers) {
pthread_once(&s_init_channel_once, InitChannel);
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to make /discovery/fetchs request: " << cntl.ErrorText();
return -1;
}
return ParseFetchsResult(cntl.response_attachment(), service_name, servers);
}
void DiscoveryNamingService::Describe(std::ostream& os,
const DescribeOptions&) const {
os << "discovery";
return;
}
NamingService* DiscoveryNamingService::New() const {
return new DiscoveryNamingService;
}
void DiscoveryNamingService::Destroy() {
delete this;
}
bool DiscoveryRegisterParam::IsValid() const { bool DiscoveryRegisterParam::IsValid() const {
if (appid.empty() || hostname.empty() || addrs.empty() || if (appid.empty() || hostname.empty() || addrs.empty() ||
env.empty() || zone.empty() || version.empty()) { env.empty() || zone.empty() || version.empty()) {
...@@ -216,6 +208,13 @@ bool DiscoveryRegisterParam::IsValid() const { ...@@ -216,6 +208,13 @@ bool DiscoveryRegisterParam::IsValid() const {
return true; return true;
} }
bool DiscoveryFetchsParam::IsValid() const {
if (appid.empty() || env.empty() || status.empty()) {
return false;
}
return true;
}
DiscoveryClient::DiscoveryClient() DiscoveryClient::DiscoveryClient()
: _th(INVALID_BTHREAD) : _th(INVALID_BTHREAD)
, _state(INIT) {} , _state(INIT) {}
...@@ -246,7 +245,7 @@ int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) { ...@@ -246,7 +245,7 @@ int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
return code; return code;
} }
int DiscoveryClient::do_renew() const { int DiscoveryClient::DoRenew() const {
Controller cntl; Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST); cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/renew"; cntl.http_request().uri() = "/discovery/renew";
...@@ -304,7 +303,7 @@ void* DiscoveryClient::PeriodicRenew(void* arg) { ...@@ -304,7 +303,7 @@ void* DiscoveryClient::PeriodicRenew(void* arg) {
} }
// Do register until succeed or Cancel is called // Do register until succeed or Cancel is called
while (!bthread_stopped(bthread_self())) { while (!bthread_stopped(bthread_self())) {
if (d->do_register() == 0) { if (d->DoRegister() == 0) {
break; break;
} }
bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000); bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
...@@ -312,7 +311,7 @@ void* DiscoveryClient::PeriodicRenew(void* arg) { ...@@ -312,7 +311,7 @@ void* DiscoveryClient::PeriodicRenew(void* arg) {
consecutive_renew_error = 0; consecutive_renew_error = 0;
} }
if (d->do_renew() != 0) { if (d->DoRenew() != 0) {
consecutive_renew_error++; consecutive_renew_error++;
continue; continue;
} }
...@@ -360,7 +359,7 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) { ...@@ -360,7 +359,7 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
_version = req.version; _version = req.version;
_metadata = req.metadata; _metadata = req.metadata;
if (do_register() != 0) { if (DoRegister() != 0) {
return -1; return -1;
} }
if (bthread_start_background(&_th, NULL, PeriodicRenew, this) != 0) { if (bthread_start_background(&_th, NULL, PeriodicRenew, this) != 0) {
...@@ -391,12 +390,12 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) { ...@@ -391,12 +390,12 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
if (is_canceled) { if (is_canceled) {
bthread_stop(_th); bthread_stop(_th);
bthread_join(_th, NULL); bthread_join(_th, NULL);
return do_cancel(); return DoCancel();
} }
return 0; return 0;
} }
int DiscoveryClient::do_register() const { int DiscoveryClient::DoRegister() const {
Controller cntl; Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST); cntl.http_request().set_method(HTTP_METHOD_POST);
cntl.http_request().uri() = "/discovery/register"; cntl.http_request().uri() = "/discovery/register";
...@@ -447,10 +446,10 @@ int DiscoveryClient::Cancel() { ...@@ -447,10 +446,10 @@ int DiscoveryClient::Cancel() {
CHECK_NE(_th, INVALID_BTHREAD); CHECK_NE(_th, INVALID_BTHREAD);
bthread_stop(_th); bthread_stop(_th);
bthread_join(_th, NULL); bthread_join(_th, NULL);
return do_cancel(); return DoCancel();
} }
int DiscoveryClient::do_cancel() const { int DiscoveryClient::DoCancel() const {
pthread_once(&s_init_channel_once, InitChannel); pthread_once(&s_init_channel_once, InitChannel);
Controller cntl; Controller cntl;
cntl.http_request().set_method(HTTP_METHOD_POST); cntl.http_request().set_method(HTTP_METHOD_POST);
...@@ -476,6 +475,24 @@ int DiscoveryClient::do_cancel() const { ...@@ -476,6 +475,24 @@ int DiscoveryClient::do_cancel() const {
return 0; return 0;
} }
int DiscoveryClient::Fetchs(const DiscoveryFetchsParam& req,
std::vector<ServerNode>* servers) {
if (!req.IsValid()) {
return false;
}
pthread_once(&s_init_channel_once, InitChannel);
servers->clear();
Controller cntl;
cntl.http_request().uri() = butil::string_printf(
"/discovery/fetchs?appid=%s&env=%s&status=%s", req.appid.c_str(),
req.env.c_str(), req.status.c_str());
s_discovery_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText();
return -1;
}
return ParseFetchsResult(cntl.response_attachment(), req.appid.c_str(), servers);
}
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -24,18 +24,6 @@ ...@@ -24,18 +24,6 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
class DiscoveryNamingService : public PeriodicNamingService {
private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers) override;
void Describe(std::ostream& os, const DescribeOptions&) const override;
NamingService* New() const override;
void Destroy() override;
};
struct DiscoveryRegisterParam { struct DiscoveryRegisterParam {
std::string appid; std::string appid;
std::string hostname; std::string hostname;
...@@ -50,6 +38,14 @@ struct DiscoveryRegisterParam { ...@@ -50,6 +38,14 @@ struct DiscoveryRegisterParam {
bool IsValid() const; bool IsValid() const;
}; };
struct DiscoveryFetchsParam {
std::string appid;
std::string env;
std::string status;
bool IsValid() const;
};
// ONE DiscoveryClient corresponds to ONE service instance. // ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple service instances to register, // If your program has multiple service instances to register,
// you need multiple DiscoveryClient. // you need multiple DiscoveryClient.
...@@ -60,12 +56,13 @@ public: ...@@ -60,12 +56,13 @@ public:
int Register(const DiscoveryRegisterParam& req); int Register(const DiscoveryRegisterParam& req);
int Cancel(); int Cancel();
int Fetchs(const DiscoveryFetchsParam& req, std::vector<ServerNode>* servers);
private: private:
static void* PeriodicRenew(void* arg); static void* PeriodicRenew(void* arg);
int do_cancel() const; int DoCancel() const;
int do_register() const; int DoRegister() const;
int do_renew() const; int DoRenew() const;
private: private:
enum State { enum State {
...@@ -88,6 +85,22 @@ private: ...@@ -88,6 +85,22 @@ private:
std::string _metadata; std::string _metadata;
}; };
class DiscoveryNamingService : public PeriodicNamingService {
private:
int GetServers(const char* service_name,
std::vector<ServerNode>* servers) override;
void Describe(std::ostream& os, const DescribeOptions&) const override;
NamingService* New() const override;
void Destroy() override;
private:
DiscoveryClient _client;
};
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment