Commit b17bcc8b authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: fix UT

parent 593082c4
......@@ -303,7 +303,7 @@ int ConsistentHashingLoadBalancer::SelectServer(
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp && rp->StopRevivingIfNecessary()) {
std::set<ServerId> server_list;
for (auto server: *s) {
server_list.insert(server.server_sock);
......@@ -313,7 +313,6 @@ int ConsistentHashingLoadBalancer::SelectServer(
if (rp->DoReject(server_list_distinct)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
std::vector<Node>::const_iterator choice =
......
......@@ -111,11 +111,10 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp && rp->StopRevivingIfNecessary()) {
if (rp->DoReject(s->server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
uint32_t stride = 0;
size_t offset = butil::fast_rand_less_than(n);
......
......@@ -111,11 +111,10 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp && rp->StopRevivingIfNecessary()) {
if (rp->DoReject(s->server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
TLS tls = s.tls();
if (tls.stride == 0) {
......
......@@ -159,7 +159,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp && rp->StopRevivingIfNecessary()) {
std::vector<ServerId> server_list;
server_list.reserve(s->server_list.size());
for (auto server: s->server_list) {
......@@ -168,7 +168,6 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (rp->DoReject(server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
TLS& tls = s.tls();
if (tls.IsNeededCaculateNewStride(s->weight_sum, s->server_list.size())) {
......
......@@ -39,17 +39,23 @@ void DefaultRevivePolicy::StartReviving() {
_reviving = true;
}
void DefaultRevivePolicy::StopRevivingIfNecessary() {
bool DefaultRevivePolicy::StopRevivingIfNecessary() {
int64_t now_ms = butil::gettimeofday_ms();
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (!_reviving) {
mu.unlock();
return false;
}
if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_time_ms)) {
_reviving = false;
_last_usable_change_time_ms = 0;
mu.unlock();
return false;
}
}
return;
return true;
}
bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) {
......
......@@ -41,7 +41,8 @@ public:
// Stop reviving state and do not reject the request if some condition is
// satisfied.
virtual void StopRevivingIfNecessary() = 0;
// Return true if the current state is still in reviving.
virtual bool StopRevivingIfNecessary() = 0;
};
// The default revive policy. Once no servers are available, reviving is start.
......@@ -56,7 +57,7 @@ public:
void StartReviving();
bool DoReject(const std::vector<ServerId>& server_list);
void StopRevivingIfNecessary();
bool StopRevivingIfNecessary();
private:
bool _reviving;
......
......@@ -789,7 +789,18 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
}
TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
brpc::LoadBalancer* lb = new brpc::policy::RandomizedLoadBalancer;
brpc::LoadBalancer* lb = NULL;
int rand = butil::fast_rand_less_than(4);
if (rand == 0) {
lb = new brpc::policy::RoundRobinLoadBalancer;
} else if (rand == 1) {
lb = new brpc::policy::RandomizedLoadBalancer;
} else if (rand == 2) {
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
} else {
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::MurmurHash32);
}
LOG(INFO) << "r=" << rand;
brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
butil::EndPoint dummy;
......@@ -797,12 +808,15 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
brpc::SocketOptions options;
options.remote_side = dummy;
brpc::ServerId id(8888);
id.tag = "50";
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr[i]));
lb->AddServer(id);
}
brpc::SocketUniquePtr sptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL };
int64_t hold_time_ms = 2000;
brpc::RevivePolicy* rp = new brpc::DefaultRevivePolicy(2, hold_time_ms);
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL, rp };
brpc::LoadBalancer::SelectOut out(&sptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
......@@ -831,10 +845,8 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
ASSERT_TRUE(false);
}
}
ASSERT_TRUE(abs(num_ereject - num_ok) < 20);
// TODO(zhujiashun): longer than interval
int64_t sleep_time_ms = 2010;
bthread_usleep(sleep_time_ms * 1000);
ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
bthread_usleep((hold_time_ms + 10) * 1000);
// After enough waiting time, traffic should be sent to all available servers.
for (int i = 0; i < 10; ++i) {
......@@ -932,7 +944,6 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
cntl.set_request_code(brpc::policy::MurmurHash32(&++request_code, 8));
stub.Echo(&cntl, &req, &res, NULL);
}
bthread_usleep(500000);
butil::EndPoint point(butil::IP_ANY, 7777);
brpc::Server server;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment