Commit b5178756 authored by cdjgit's avatar cdjgit

little fix && add unit test

parent b0575134
...@@ -189,7 +189,9 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -189,7 +189,9 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
// Skip this invalid server. We need calculate a new stride for server selection. // Skip this invalid server. We need calculate a new stride for server selection.
filter.emplace(server_id); filter.emplace(server_id);
remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight; remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight;
--remain_servers; if (--remain_servers == 0) {
break;
}
// Select from begining status. // Select from begining status.
tls_temp.stride = GetStride(remain_weight, remain_servers); tls_temp.stride = GetStride(remain_weight, remain_servers);
tls_temp.position = tls.position; tls_temp.position = tls.position;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "brpc/describable.h" #include "brpc/describable.h"
#include "brpc/socket.h" #include "brpc/socket.h"
#include "butil/strings/string_number_conversions.h" #include "butil/strings/string_number_conversions.h"
#include "brpc/excluded_servers.h"
#include "brpc/policy/weighted_round_robin_load_balancer.h" #include "brpc/policy/weighted_round_robin_load_balancer.h"
#include "brpc/policy/round_robin_load_balancer.h" #include "brpc/policy/round_robin_load_balancer.h"
#include "brpc/policy/randomized_load_balancer.h" #include "brpc/policy/randomized_load_balancer.h"
...@@ -266,6 +267,7 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -266,6 +267,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
global_stop = false; global_stop = false;
pthread_t th[8]; pthread_t th[8];
std::vector<brpc::ServerId> ids; std::vector<brpc::ServerId> ids;
brpc::SocketId wrr_sid_logoff = -1;
for (int i = 0; i < 256; ++i) { for (int i = 0; i < 256; ++i) {
char addr[32]; char addr[32];
snprintf(addr, sizeof(addr), "192.%d.1.%d:8080", i, i); snprintf(addr, sizeof(addr), "192.%d.1.%d:8080", i, i);
...@@ -273,7 +275,11 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -273,7 +275,11 @@ TEST_F(LoadBalancerTest, update_while_selection) {
ASSERT_EQ(0, str2endpoint(addr, &dummy)); ASSERT_EQ(0, str2endpoint(addr, &dummy));
brpc::ServerId id(8888); brpc::ServerId id(8888);
if (3 == round) { if (3 == round) {
id.tag = "1"; if (i < 255) {
id.tag = "1";
} else {
id.tag = "200000000";
}
} }
brpc::SocketOptions options; brpc::SocketOptions options;
options.remote_side = dummy; options.remote_side = dummy;
...@@ -281,6 +287,13 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -281,6 +287,13 @@ TEST_F(LoadBalancerTest, update_while_selection) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id)); ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ids.push_back(id); ids.push_back(id);
ASSERT_TRUE(lb->AddServer(id)); ASSERT_TRUE(lb->AddServer(id));
if (round == 3 && i == 255) {
wrr_sid_logoff = id.id;
// In case of wrr, set 255th socket with huge weight logoff.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
} }
std::cout << "Time " << butil::class_name_str(*lb) << " ..." << std::endl; std::cout << "Time " << butil::class_name_str(*lb) << " ..." << std::endl;
butil::Timer tm; butil::Timer tm;
...@@ -291,7 +304,11 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -291,7 +304,11 @@ TEST_F(LoadBalancerTest, update_while_selection) {
std::vector<brpc::ServerId> removed; std::vector<brpc::ServerId> removed;
const size_t REP = 200; const size_t REP = 200;
for (size_t k = 0; k < REP; ++k) { for (size_t k = 0; k < REP; ++k) {
removed = ids; if (round != 3) {
removed = ids;
} else {
removed.assign(ids.begin(), ids.begin() + 255);
}
std::random_shuffle(removed.begin(), removed.end()); std::random_shuffle(removed.begin(), removed.end());
removed.pop_back(); removed.pop_back();
ASSERT_EQ(removed.size(), lb->RemoveServersInBatch(removed)); ASSERT_EQ(removed.size(), lb->RemoveServersInBatch(removed));
...@@ -333,19 +350,30 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -333,19 +350,30 @@ TEST_F(LoadBalancerTest, update_while_selection) {
<< count * 1000000L / tm.u_elapsed() << " times/s" << count * 1000000L / tm.u_elapsed() << " times/s"
<< std::endl; << std::endl;
} }
ASSERT_EQ(ids.size(), total_count.size()); size_t id_num = ids.size();
for (size_t i = 0; i < ids.size(); ++i) { if (round == 3) {
// Do not include the logoff socket.
id_num -= 1;
}
ASSERT_EQ(id_num, total_count.size());
for (size_t i = 0; i < id_num; ++i) {
ASSERT_NE(0, total_count[ids[i].id]) << "i=" << i; ASSERT_NE(0, total_count[ids[i].id]) << "i=" << i;
std::cout << i << "=" << total_count[ids[i].id] << " "; std::cout << i << "=" << total_count[ids[i].id] << " ";
} }
std::cout << std::endl; std::cout << std::endl;
for (size_t i = 0; i < ids.size(); ++i) { for (size_t i = 0; i < id_num; ++i) {
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id)); ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
} }
ASSERT_EQ(ids.size(), nrecycle); ASSERT_EQ(ids.size(), nrecycle);
brpc::SocketId id = -1;
for (size_t i = 0; i < ids.size(); ++i) { for (size_t i = 0; i < ids.size(); ++i) {
ASSERT_EQ(1UL, total_count.erase(recycled_sockets[i])); id = recycled_sockets[i];
if (id != wrr_sid_logoff) {
ASSERT_EQ(1UL, total_count.erase(id));
} else {
ASSERT_EQ(0UL, total_count.erase(id));
}
} }
delete lb; delete lb;
} }
...@@ -563,11 +591,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -563,11 +591,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
"10.92.115.19:8831", "10.92.115.19:8831",
"10.42.108.25:8832", "10.42.108.25:8832",
"10.36.150.32:8833", "10.36.150.32:8833",
"10.36.150.32:8899",
"10.92.149.48:8834", "10.92.149.48:8834",
"10.42.122.201:8835", "10.42.122.201:8835",
"10.42.122.202:8836" "10.42.122.202:8836"
}; };
std::string weight[] = {"3", "2", "7", "1ab", "-1", "0"}; std::string weight[] = {"3", "2", "7", "200000000", "1ab", "-1", "0"};
std::map<butil::EndPoint, int> configed_weight; std::map<butil::EndPoint, int> configed_weight;
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb; brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
...@@ -582,7 +611,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -582,7 +611,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
options.user = new SaveRecycle; options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id)); ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
id.tag = weight[i]; id.tag = weight[i];
if ( i < 3 ) { if (i == 3) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
if ( i < 4 ) {
int weight_num = 0; int weight_num = 0;
ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num)); ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
configed_weight[dummy] = weight_num; configed_weight[dummy] = weight_num;
...@@ -596,7 +630,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -596,7 +630,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// There are 3 valid servers with weight 3, 2 and 7 respectively. // There are 3 valid servers with weight 3, 2 and 7 respectively.
// We run SelectServer for 12 times. The result number of each server seleted should be // We run SelectServer for 12 times. The result number of each server seleted should be
// consistent with weight configured. // consistent with weight configured.
std::map<butil::EndPoint, int> select_result; std::map<butil::EndPoint, size_t> select_result;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
...@@ -613,7 +647,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -613,7 +647,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
} }
std::cout << std::endl; std::cout << std::endl;
// Check whether slected result is consistent with expected. // Check whether slected result is consistent with expected.
EXPECT_EQ((size_t)3, select_result.size()); EXPECT_EQ(3, select_result.size());
for (const auto& result : select_result) { for (const auto& result : select_result) {
std::cout << result.first << " result=" << result.second std::cout << result.first << " result=" << result.second
<< " configured=" << configed_weight[result.first] << std::endl; << " configured=" << configed_weight[result.first] << std::endl;
...@@ -621,4 +655,45 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -621,4 +655,45 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
} }
} }
TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
const char* servers[] = {
"10.92.115.19:8831",
"10.42.108.25:8832",
"10.36.150.32:8833"
};
std::string weight[] = {"200000000", "2", "600000"};
std::map<butil::EndPoint, int> configed_weight;
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
brpc::ExcludedServers* exclude = brpc::ExcludedServers::Create(3);
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
const char *addr = servers[i];
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(addr, &dummy));
brpc::ServerId id(8888);
brpc::SocketOptions options;
options.remote_side = dummy;
options.user = new SaveRecycle;
id.tag = weight[i];
if (i < 2) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
}
EXPECT_TRUE(wrrlb.AddServer(id));
if (i == 0) {
exclude->Add(id.id);
}
if (i == 1) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
}
// The first socket is excluded. The second socket is logfoff.
// The third socket is invalid.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
brpc::LoadBalancer::SelectOut out(&ptr);
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
brpc::ExcludedServers::Destroy(exclude);
}
} //namespace } //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment