Commit 770cc50d authored by zhujiashun's avatar zhujiashun

hide cancel() from user

parent 7129b3c5
......@@ -129,7 +129,6 @@ static void InitChannel() {
}
}
int ParseFetchsResult(const butil::IOBuf& buf,
const char* service_name,
std::vector<ServerNode>* servers) {
......@@ -201,26 +200,24 @@ int ParseFetchsResult(const butil::IOBuf& buf,
}
bool DiscoveryRegisterParam::IsValid() const {
if (appid.empty() || hostname.empty() || addrs.empty() ||
env.empty() || zone.empty() || version.empty()) {
return false;
}
return true;
return !appid.empty() && !hostname.empty() && !addrs.empty() &&
!env.empty() && !zone.empty() && !version.empty();
}
bool DiscoveryFetchsParam::IsValid() const {
if (appid.empty() || env.empty() || status.empty()) {
return false;
}
return true;
return !appid.empty() && !env.empty() && !status.empty();
}
DiscoveryClient::DiscoveryClient()
: _th(INVALID_BTHREAD)
, _state(INIT) {}
, _registered(false) {}
DiscoveryClient::~DiscoveryClient() {
Cancel();
if (_registered.load(butil::memory_order_acquire)) {
bthread_stop(_th);
bthread_join(_th, NULL);
DoCancel();
}
}
int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
......@@ -284,23 +281,7 @@ void* DiscoveryClient::PeriodicRenew(void* arg) {
while (!bthread_stopped(bthread_self())) {
if (consecutive_renew_error == FLAGS_discovery_reregister_threshold) {
LOG(WARNING) << "Reregister since discovery renew error threshold reached";
{
std::unique_lock<butil::Mutex> mu(d->_mutex);
switch (d->_state) {
case INIT:
CHECK(false) << "Impossible";
return NULL;
case REGISTERING:
case REGISTERED:
break;
case CANCELED:
return NULL;
default:
CHECK(false) << "Impossible";
return NULL;
}
}
LOG(WARNING) << "Re-register since discovery renew error threshold reached";
// Do register until succeed or Cancel is called
while (!bthread_stopped(bthread_self())) {
if (d->DoRegister() == 0) {
......@@ -310,17 +291,12 @@ void* DiscoveryClient::PeriodicRenew(void* arg) {
}
consecutive_renew_error = 0;
}
if (d->DoRenew() != 0) {
consecutive_renew_error++;
continue;
}
consecutive_renew_error = 0;
if (bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000) != 0) {
if (errno == ESTOP) {
break;
}
}
bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
}
return NULL;
}
......@@ -329,24 +305,9 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
if (!req.IsValid()) {
return -1;
}
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
_state = REGISTERING;
break;
case REGISTERING:
case REGISTERED:
LOG(WARNING) << "Discovery Appid=" << req.appid
<<" is registering or registered";
return 0;
case CANCELED:
LOG(ERROR) << "Discovery Appid=" << req.appid << " is canceled";
return -1;
default:
CHECK(false) << "Impossible";
return -1;
}
if (_registered.load(butil::memory_order_relaxed) ||
_registered.exchange(true, butil::memory_order_release)) {
return 0;
}
pthread_once(&s_init_channel_once, InitChannel);
_appid = req.appid;
......@@ -366,32 +327,6 @@ int DiscoveryClient::Register(const DiscoveryRegisterParam& req) {
LOG(ERROR) << "Fail to start background PeriodicRenew";
return -1;
}
bool is_canceled = false;
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
CHECK(false) << "Impossible";
return -1;
case REGISTERING:
_state = REGISTERED;
break;
case REGISTERED:
CHECK(false) << "Impossible";
return -1;
case CANCELED:
is_canceled = true;
break;
default:
CHECK(false) << "Impossible";
return -1;
}
}
if (is_canceled) {
bthread_stop(_th);
bthread_join(_th, NULL);
return DoCancel();
}
return 0;
}
......@@ -425,30 +360,6 @@ int DiscoveryClient::DoRegister() const {
return 0;
}
int DiscoveryClient::Cancel() {
{
std::unique_lock<butil::Mutex> mu(_mutex);
switch (_state) {
case INIT:
case REGISTERING:
_state = CANCELED;
return 0;
case REGISTERED:
_state = CANCELED;
break;
case CANCELED:
return 0;
default:
CHECK(false) << "Impossible";
return -1;
}
}
CHECK_NE(_th, INVALID_BTHREAD);
bthread_stop(_th);
bthread_join(_th, NULL);
return DoCancel();
}
int DiscoveryClient::DoCancel() const {
pthread_once(&s_init_channel_once, InitChannel);
Controller cntl;
......@@ -476,7 +387,7 @@ int DiscoveryClient::DoCancel() const {
}
int DiscoveryClient::Fetchs(const DiscoveryFetchsParam& req,
std::vector<ServerNode>* servers) {
std::vector<ServerNode>* servers) const {
if (!req.IsValid()) {
return false;
}
......
......@@ -49,14 +49,14 @@ struct DiscoveryFetchsParam {
// ONE DiscoveryClient corresponds to ONE service instance.
// If your program has multiple service instances to register,
// you need multiple DiscoveryClient.
// Note: Unregister is automatically called in dtor.
class DiscoveryClient {
public:
DiscoveryClient();
~DiscoveryClient();
int Register(const DiscoveryRegisterParam& req);
int Cancel();
int Fetchs(const DiscoveryFetchsParam& req, std::vector<ServerNode>* servers);
int Fetchs(const DiscoveryFetchsParam& req, std::vector<ServerNode>* servers) const;
private:
static void* PeriodicRenew(void* arg);
......@@ -65,15 +65,8 @@ private:
int DoRenew() const;
private:
enum State {
INIT,
REGISTERING,
REGISTERED,
CANCELED
};
bthread_t _th;
State _state;
butil::Mutex _mutex;
butil::atomic<bool> _registered;
std::string _appid;
std::string _hostname;
std::string _addrs;
......
......@@ -626,7 +626,6 @@ TEST(NamingServiceTest, discovery_sanity) {
ASSERT_EQ(0, dcns.GetServers("admin.test", &servers));
ASSERT_EQ((size_t)1, servers.size());
brpc::policy::DiscoveryClient dc;
brpc::policy::DiscoveryRegisterParam dparam;
dparam.appid = "main.test";
dparam.hostname = "hostname";
......@@ -635,24 +634,21 @@ TEST(NamingServiceTest, discovery_sanity) {
dparam.zone = "sh001";
dparam.status = 1;
dparam.version = "v1";
ASSERT_EQ(0, dc.Register(dparam));
bthread_usleep(1000000);
ASSERT_EQ(0, dc.Cancel());
ASSERT_GT(svc.RenewCount(), 0);
ASSERT_EQ(svc.CancelCount(), 1);
brpc::policy::DiscoveryClient dc2;
ASSERT_EQ(0, dc2.Cancel());
ASSERT_EQ(-1, dc2.Register(dparam));
{
brpc::policy::DiscoveryClient dc3;
ASSERT_EQ(0, dc3.Register(dparam));
ASSERT_EQ(0, dc3.Cancel());
brpc::policy::DiscoveryClient dc;
}
// Dtor of DiscoveryClient also calls Cancel(), we need to ensure that
// Cancel() is called only once. One is from dc1, the other is from dc3.
ASSERT_EQ(svc.CancelCount(), 2);
// Cancel is called iff Register is called
ASSERT_EQ(svc.CancelCount(), 0);
{
brpc::policy::DiscoveryClient dc;
// Two Register should start one Renew task , and make
// svc.RenewCount() be one.
ASSERT_EQ(0, dc.Register(dparam));
ASSERT_EQ(0, dc.Register(dparam));
bthread_usleep(1000000);
}
ASSERT_EQ(svc.RenewCount(), 1);
ASSERT_EQ(svc.CancelCount(), 1);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment