Unverified Commit aceaebb4 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #547 from cdjingit/wrr_algo_enhancement_caidaojin

wrr lb algo enhancement
parents b0533d17 646b129f
......@@ -171,44 +171,70 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
tls.remain_server.id != s->server_list[tls.position].id) {
tls.remain_server.weight = 0;
}
for (uint64_t i = 0; i != tls.stride; ++i) {
SocketId server_id = GetServerInNextStride(s->server_list, tls);
// The servers that can not be choosed.
std::unordered_set<SocketId> filter;
TLS tls_temp = tls;
uint64_t remain_weight = s->weight_sum;
size_t remain_servers = s->server_list.size();
while (remain_servers > 0) {
SocketId server_id = GetServerInNextStride(s->server_list, filter, tls_temp);
if (!ExcludedServers::IsExcluded(in.excluded, server_id)
&& Socket::Address(server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) {
// update tls.
tls.remain_server = tls_temp.remain_server;
tls.position = tls_temp.position;
return 0;
} else {
// Skip this invalid server. We need calculate a new stride for server selection.
if (--remain_servers == 0) {
break;
}
filter.emplace(server_id);
remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight;
// Select from begining status.
tls_temp.stride = GetStride(remain_weight, remain_servers);
tls_temp.position = tls.position;
tls_temp.remain_server = tls.remain_server;
}
}
return EHOSTDOWN;
}
SocketId WeightedRoundRobinLoadBalancer::GetServerInNextStride(
const std::vector<Server>& server_list, TLS& tls) {
SocketId final_server = 0;
const std::vector<Server>& server_list,
const std::unordered_set<SocketId>& filter,
TLS& tls) {
SocketId final_server = INVALID_SOCKET_ID;
uint64_t stride = tls.stride;
if (tls.remain_server.weight > 0) {
final_server = tls.remain_server.id;
if (tls.remain_server.weight > stride) {
tls.remain_server.weight -= stride;
return final_server;
} else {
stride -= tls.remain_server.weight;
tls.remain_server.weight = 0;
++tls.position;
tls.position %= server_list.size();
Server& remain = tls.remain_server;
if (remain.weight > 0) {
if (filter.count(remain.id) == 0) {
final_server = remain.id;
if (remain.weight > stride) {
remain.weight -= stride;
return final_server;
} else {
stride -= remain.weight;
}
}
remain.weight = 0;
++tls.position;
tls.position %= server_list.size();
}
while (stride > 0) {
uint32_t configured_weight = server_list[tls.position].weight;
final_server = server_list[tls.position].id;
if (configured_weight > stride) {
tls.remain_server.id = final_server;
tls.remain_server.weight = configured_weight - stride;
return final_server;
if (filter.count(final_server) == 0) {
uint32_t configured_weight = server_list[tls.position].weight;
if (configured_weight > stride) {
remain.id = final_server;
remain.weight = configured_weight - stride;
return final_server;
}
stride -= configured_weight;
}
stride -= configured_weight;
++tls.position;
tls.position %= server_list.size();
tls.position %= server_list.size();
}
return final_server;
}
......
......@@ -19,6 +19,7 @@
#include <map>
#include <vector>
#include <unordered_set>
#include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h"
......@@ -75,7 +76,8 @@ private:
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
static SocketId GetServerInNextStride(const std::vector<Server>& server_list,
TLS& tls);
const std::unordered_set<SocketId>& filter,
TLS& tls);
butil::DoublyBufferedData<Servers, TLS> _db_servers;
};
......
......@@ -15,6 +15,7 @@
#include "brpc/describable.h"
#include "brpc/socket.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/excluded_servers.h"
#include "brpc/policy/weighted_round_robin_load_balancer.h"
#include "brpc/policy/round_robin_load_balancer.h"
#include "brpc/policy/randomized_load_balancer.h"
......@@ -266,6 +267,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
global_stop = false;
pthread_t th[8];
std::vector<brpc::ServerId> ids;
brpc::SocketId wrr_sid_logoff = -1;
for (int i = 0; i < 256; ++i) {
char addr[32];
snprintf(addr, sizeof(addr), "192.%d.1.%d:8080", i, i);
......@@ -273,7 +275,11 @@ TEST_F(LoadBalancerTest, update_while_selection) {
ASSERT_EQ(0, str2endpoint(addr, &dummy));
brpc::ServerId id(8888);
if (3 == round) {
id.tag = "1";
if (i < 255) {
id.tag = "1";
} else {
id.tag = "200000000";
}
}
brpc::SocketOptions options;
options.remote_side = dummy;
......@@ -281,6 +287,13 @@ TEST_F(LoadBalancerTest, update_while_selection) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ids.push_back(id);
ASSERT_TRUE(lb->AddServer(id));
if (round == 3 && i == 255) {
wrr_sid_logoff = id.id;
// In case of wrr, set 255th socket with huge weight logoff.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
}
std::cout << "Time " << butil::class_name_str(*lb) << " ..." << std::endl;
butil::Timer tm;
......@@ -291,7 +304,11 @@ TEST_F(LoadBalancerTest, update_while_selection) {
std::vector<brpc::ServerId> removed;
const size_t REP = 200;
for (size_t k = 0; k < REP; ++k) {
removed = ids;
if (round != 3) {
removed = ids;
} else {
removed.assign(ids.begin(), ids.begin() + 255);
}
std::random_shuffle(removed.begin(), removed.end());
removed.pop_back();
ASSERT_EQ(removed.size(), lb->RemoveServersInBatch(removed));
......@@ -333,19 +350,30 @@ TEST_F(LoadBalancerTest, update_while_selection) {
<< count * 1000000L / tm.u_elapsed() << " times/s"
<< std::endl;
}
ASSERT_EQ(ids.size(), total_count.size());
for (size_t i = 0; i < ids.size(); ++i) {
size_t id_num = ids.size();
if (round == 3) {
// Do not include the logoff socket.
id_num -= 1;
}
ASSERT_EQ(id_num, total_count.size());
for (size_t i = 0; i < id_num; ++i) {
ASSERT_NE(0, total_count[ids[i].id]) << "i=" << i;
std::cout << i << "=" << total_count[ids[i].id] << " ";
}
std::cout << std::endl;
for (size_t i = 0; i < ids.size(); ++i) {
for (size_t i = 0; i < id_num; ++i) {
ASSERT_EQ(0, brpc::Socket::SetFailed(ids[i].id));
}
ASSERT_EQ(ids.size(), nrecycle);
brpc::SocketId id = -1;
for (size_t i = 0; i < ids.size(); ++i) {
ASSERT_EQ(1UL, total_count.erase(recycled_sockets[i]));
id = recycled_sockets[i];
if (id != wrr_sid_logoff) {
ASSERT_EQ(1UL, total_count.erase(id));
} else {
ASSERT_EQ(0UL, total_count.erase(id));
}
}
delete lb;
}
......@@ -563,11 +591,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
"10.92.115.19:8831",
"10.42.108.25:8832",
"10.36.150.32:8833",
"10.36.150.32:8899",
"10.92.149.48:8834",
"10.42.122.201:8835",
"10.42.122.202:8836"
};
std::string weight[] = {"3", "2", "7", "1ab", "-1", "0"};
std::string weight[] = {"3", "2", "7", "200000000", "1ab", "-1", "0"};
std::map<butil::EndPoint, int> configed_weight;
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
......@@ -582,7 +611,12 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
id.tag = weight[i];
if ( i < 3 ) {
if (i == 3) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
if ( i < 4 ) {
int weight_num = 0;
ASSERT_TRUE(butil::StringToInt(weight[i], &weight_num));
configed_weight[dummy] = weight_num;
......@@ -596,7 +630,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// There are 3 valid servers with weight 3, 2 and 7 respectively.
// We run SelectServer for 12 times. The result number of each server seleted should be
// consistent with weight configured.
std::map<butil::EndPoint, int> select_result;
std::map<butil::EndPoint, size_t> select_result;
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
......@@ -613,7 +647,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
}
std::cout << std::endl;
// Check whether slected result is consistent with expected.
EXPECT_EQ((size_t)3, select_result.size());
EXPECT_EQ(3, select_result.size());
for (const auto& result : select_result) {
std::cout << result.first << " result=" << result.second
<< " configured=" << configed_weight[result.first] << std::endl;
......@@ -621,4 +655,45 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
}
}
TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
const char* servers[] = {
"10.92.115.19:8831",
"10.42.108.25:8832",
"10.36.150.32:8833"
};
std::string weight[] = {"200000000", "2", "600000"};
std::map<butil::EndPoint, int> configed_weight;
brpc::policy::WeightedRoundRobinLoadBalancer wrrlb;
brpc::ExcludedServers* exclude = brpc::ExcludedServers::Create(3);
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
const char *addr = servers[i];
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(addr, &dummy));
brpc::ServerId id(8888);
brpc::SocketOptions options;
options.remote_side = dummy;
options.user = new SaveRecycle;
id.tag = weight[i];
if (i < 2) {
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
}
EXPECT_TRUE(wrrlb.AddServer(id));
if (i == 0) {
exclude->Add(id.id);
}
if (i == 1) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr));
ptr->SetLogOff();
}
}
// The first socket is excluded. The second socket is logfoff.
// The third socket is invalid.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
brpc::LoadBalancer::SelectOut out(&ptr);
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
brpc::ExcludedServers::Destroy(exclude);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment