Unverified Commit 2ef40e6b authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #592 from cdjingit/lb_configurable_and_consistency_lb_refactor_caidaojin

lb configurable && consistency hash lb refactor
parents 1b9e0064 4a7c3c5a
......@@ -113,8 +113,9 @@ const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
, ch_md5_lb(MD5Hash32)
: ch_mh_lb(CONS_HASH_LB_MURMUR3)
, ch_md5_lb(CONS_HASH_LB_MD5)
, ch_ketama_lb(CONS_HASH_LB_KETAMA)
, constant_cl(0) {
}
......@@ -134,6 +135,7 @@ struct GlobalExtensions {
LocalityAwareLoadBalancer la_lb;
ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb;
ConsistentHashingLoadBalancer ch_ketama_lb;
DynPartLoadBalancer dynpart_lb;
AutoConcurrencyLimiter auto_cl;
......@@ -355,6 +357,7 @@ static void GlobalInitializeOrDieImpl() {
LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb);
LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
// Compress Handlers
......
......@@ -62,18 +62,23 @@ SharedLoadBalancer::~SharedLoadBalancer() {
}
}
int SharedLoadBalancer::Init(const char* lb_name) {
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name);
int SharedLoadBalancer::Init(const char* lb_protocol) {
std::string lb_name;
butil::StringPiece lb_params;
if (!ParseParameters(lb_protocol, &lb_name, &lb_params)) {
LOG(FATAL) << "Fail to parse this load balancer protocol '" << lb_protocol << '\'';
return -1;
}
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name.c_str());
if (lb == NULL) {
LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'";
return -1;
}
LoadBalancer* lb_copy = lb->New();
if (lb_copy == NULL) {
_lb = lb->New(lb_params);
if (_lb == NULL) {
LOG(FATAL) << "Fail to new LoadBalancer";
return -1;
}
_lb = lb_copy;
if (FLAGS_show_lb_in_vars && !_exposed) {
ExposeLB();
}
......@@ -89,4 +94,26 @@ void SharedLoadBalancer::Describe(std::ostream& os,
}
}
bool SharedLoadBalancer::ParseParameters(const butil::StringPiece& lb_protocol,
std::string* lb_name,
butil::StringPiece* lb_params) {
lb_name->clear();
lb_params->clear();
if (lb_protocol.empty()) {
return false;
}
const char separator = ':';
size_t pos = lb_protocol.find(separator);
if (pos == std::string::npos) {
lb_name->append(lb_protocol.data(), lb_protocol.size());
} else {
lb_name->append(lb_protocol.data(), pos);
if (pos < lb_protocol.size() - sizeof(separator)) {
*lb_params = lb_protocol.substr(pos + sizeof(separator));
}
}
return true;
}
} // namespace brpc
......@@ -24,6 +24,8 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T>
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
namespace brpc {
......@@ -100,7 +102,7 @@ public:
// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual LoadBalancer* New() const = 0;
virtual LoadBalancer* New(const butil::StringPiece& params) const = 0;
protected:
virtual ~LoadBalancer() { }
......@@ -164,6 +166,9 @@ public:
}
private:
static bool ParseParameters(const butil::StringPiece& lb_protocol,
std::string* lb_name,
butil::StringPiece* lb_params);
static void DescribeLB(std::ostream& os, void* arg);
void ExposeLB();
......
......@@ -18,8 +18,10 @@
#include <gflags/gflags.h>
#include "butil/containers/flat_map.h"
#include "butil/errno.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/socket.h"
#include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h"
namespace brpc {
......@@ -29,16 +31,119 @@ namespace policy {
DEFINE_int32(chash_num_replicas, 100,
"default number of replicas per server in chash");
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(HashFunc hash)
: _hash(hash)
, _num_replicas(FLAGS_chash_num_replicas) {
// Defined in hasher.cpp.
const char* GetHashName(HashFunc hasher);
class ReplicaPolicy {
public:
virtual ~ReplicaPolicy() = default;
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const = 0;
virtual const char* name() const = 0;
};
class DefaultReplicaPolicy : public ReplicaPolicy {
public:
DefaultReplicaPolicy(HashFunc hash) : _hash_func(hash) {}
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;
virtual const char* name() const { return GetHashName(_hash_func); }
private:
HashFunc _hash_func;
};
bool DefaultReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
for (size_t i = 0; i < num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
ConsistentHashingLoadBalancer::Node node;
node.hash = _hash_func(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
replicas->push_back(node);
}
return true;
}
class KetamaReplicaPolicy : public ReplicaPolicy {
public:
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;
virtual const char* name() const { return "ketama"; }
};
bool KetamaReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
const size_t points_per_hash = 4;
CHECK(num_replicas % points_per_hash == 0)
<< "Ketam hash replicas number(" << num_replicas << ") should be n*4";
for (size_t i = 0; i < num_replicas / points_per_hash; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
unsigned char digest[16];
MD5HashSignature(host, len, digest);
for (size_t j = 0; j < points_per_hash; ++j) {
ConsistentHashingLoadBalancer::Node node;
node.server_sock = server;
node.server_addr = ptr->remote_side();
node.hash = ((uint32_t) (digest[3 + j * 4] & 0xFF) << 24)
| ((uint32_t) (digest[2 + j * 4] & 0xFF) << 16)
| ((uint32_t) (digest[1 + j * 4] & 0xFF) << 8)
| (digest[0 + j * 4] & 0xFF);
replicas->push_back(node);
}
}
return true;
}
namespace {
pthread_once_t s_replica_policy_once = PTHREAD_ONCE_INIT;
const std::array<const ReplicaPolicy*, CONS_HASH_LB_LAST>* g_replica_policy = nullptr;
void InitReplicaPolicy() {
g_replica_policy = new std::array<const ReplicaPolicy*, CONS_HASH_LB_LAST>({
new DefaultReplicaPolicy(MurmurHash32),
new DefaultReplicaPolicy(MD5Hash32),
new KetamaReplicaPolicy
});
}
inline const ReplicaPolicy* GetReplicaPolicy(ConsistentHashingLoadBalancerType type) {
pthread_once(&s_replica_policy_once, InitReplicaPolicy);
return g_replica_policy->at(type);
}
} // namespace
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(
HashFunc hash,
size_t num_replicas)
: _hash(hash)
, _num_replicas(num_replicas) {
ConsistentHashingLoadBalancerType type)
: _num_replicas(FLAGS_chash_num_replicas), _type(type) {
CHECK(GetReplicaPolicy(_type))
<< "Fail to find replica policy for consistency lb type: '" << _type << '\'';
}
size_t ConsistentHashingLoadBalancer::AddBatch(
......@@ -112,20 +217,9 @@ size_t ConsistentHashingLoadBalancer::Remove(
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
std::vector<Node> add_nodes;
add_nodes.reserve(_num_replicas);
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
if (!GetReplicaPolicy(_type)->Build(server, _num_replicas, &add_nodes)) {
return false;
}
for (size_t i = 0; i < _num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
Node node;
node.hash = _hash(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
}
std::sort(add_nodes.begin(), add_nodes.end());
bool executed = false;
const size_t ret = _db_hash_ring.ModifyWithForeground(
......@@ -138,23 +232,12 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch(
const std::vector<ServerId> &servers) {
std::vector<Node> add_nodes;
add_nodes.reserve(servers.size() * _num_replicas);
std::vector<Node> replicas;
replicas.reserve(_num_replicas);
for (size_t i = 0; i < servers.size(); ++i) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(servers[i].id, &ptr) == -1) {
continue;
}
for (size_t rep = 0; rep < _num_replicas; ++rep) {
char host[32];
// To be compatible with libmemcached, we formulate the key of
// a virtual node as `|address|-|replica_index|', see
// http://fe.baidu.com/-1bszwnf at line 297.
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), rep);
Node node;
node.hash = _hash(host, len);
node.server_sock = servers[i];
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
replicas.clear();
if (GetReplicaPolicy(_type)->Build(servers[i], _num_replicas, &replicas)) {
add_nodes.insert(add_nodes.end(), replicas.begin(), replicas.end());
}
}
std::sort(add_nodes.begin(), add_nodes.end());
......@@ -187,8 +270,14 @@ size_t ConsistentHashingLoadBalancer::RemoveServersInBatch(
return n;
}
LoadBalancer *ConsistentHashingLoadBalancer::New() const {
return new (std::nothrow) ConsistentHashingLoadBalancer(_hash);
LoadBalancer *ConsistentHashingLoadBalancer::New(const butil::StringPiece& params) const {
ConsistentHashingLoadBalancer* lb =
new (std::nothrow) ConsistentHashingLoadBalancer(_type);
if (lb != nullptr && !lb->SetParameters(params)) {
delete lb;
lb = nullptr;
}
return lb;
}
void ConsistentHashingLoadBalancer::Destroy() {
......@@ -232,8 +321,6 @@ int ConsistentHashingLoadBalancer::SelectServer(
return EHOSTDOWN;
}
extern const char *GetHashName(uint32_t (*hasher)(const void* key, size_t len));
void ConsistentHashingLoadBalancer::Describe(
std::ostream &os, const DescribeOptions& options) {
if (!options.verbose) {
......@@ -241,7 +328,7 @@ void ConsistentHashingLoadBalancer::Describe(
return;
}
os << "ConsistentHashingLoadBalancer {\n"
<< " hash function: " << GetHashName(_hash) << '\n'
<< " hash function: " << GetReplicaPolicy(_type)->name() << '\n'
<< " replica per host: " << _num_replicas << '\n';
std::map<butil::EndPoint, double> load_map;
GetLoads(&load_map);
......@@ -289,5 +376,24 @@ void ConsistentHashingLoadBalancer::GetLoads(
}
}
bool ConsistentHashingLoadBalancer::SetParameters(const butil::StringPiece& params) {
for (butil::StringSplitter sp(params.begin(), params.end(), ' '); sp != nullptr; ++sp) {
butil::StringPiece key_value(sp.field(), sp.length());
size_t p = key_value.find('=');
if (p == key_value.npos || p == key_value.size() - 1) {
// No value configed.
return false;
}
if (key_value.substr(0, p) == "replicas") {
if (!butil::StringToSizeT(key_value.substr(p + 1), &_num_replicas)) {
return false;
}
continue;
}
LOG(ERROR) << "Failed to set this unknown parameters " << key_value;
}
return true;
}
} // namespace policy
} // namespace brpc
......@@ -18,6 +18,7 @@
#define BRPC_CONSISTENT_HASHING_LOAD_BALANCER_H
#include <stdint.h> // uint32_t
#include <functional>
#include <vector> // std::vector
#include "butil/endpoint.h" // butil::EndPoint
#include "butil/containers/doubly_buffered_data.h"
......@@ -27,22 +28,19 @@
namespace brpc {
namespace policy {
class ReplicaPolicy;
enum ConsistentHashingLoadBalancerType {
CONS_HASH_LB_MURMUR3 = 0,
CONS_HASH_LB_MD5 = 1,
CONS_HASH_LB_KETAMA = 2,
// Identify the last one.
CONS_HASH_LB_LAST = 3
};
class ConsistentHashingLoadBalancer : public LoadBalancer {
public:
typedef uint32_t (*HashFunc)(const void* key, size_t len);
explicit ConsistentHashingLoadBalancer(HashFunc hash);
ConsistentHashingLoadBalancer(HashFunc hash, size_t num_replicas);
bool AddServer(const ServerId& server);
bool RemoveServer(const ServerId& server);
size_t AddServersInBatch(const std::vector<ServerId> &servers);
size_t RemoveServersInBatch(const std::vector<ServerId> &servers);
LoadBalancer *New() const;
void Destroy();
int SelectServer(const SelectIn &in, SelectOut *out);
void Describe(std::ostream &os, const DescribeOptions& options);
private:
void GetLoads(std::map<butil::EndPoint, double> *load_map);
struct Node {
uint32_t hash;
ServerId server_sock;
......@@ -56,14 +54,27 @@ private:
return hash < code;
}
};
explicit ConsistentHashingLoadBalancer(ConsistentHashingLoadBalancerType type);
bool AddServer(const ServerId& server);
bool RemoveServer(const ServerId& server);
size_t AddServersInBatch(const std::vector<ServerId> &servers);
size_t RemoveServersInBatch(const std::vector<ServerId> &servers);
LoadBalancer *New(const butil::StringPiece& params) const;
void Destroy();
int SelectServer(const SelectIn &in, SelectOut *out);
void Describe(std::ostream &os, const DescribeOptions& options);
private:
bool SetParameters(const butil::StringPiece& params);
void GetLoads(std::map<butil::EndPoint, double> *load_map);
static size_t AddBatch(std::vector<Node> &bg, const std::vector<Node> &fg,
const std::vector<Node> &servers, bool *executed);
static size_t RemoveBatch(std::vector<Node> &bg, const std::vector<Node> &fg,
const std::vector<ServerId> &servers, bool *executed);
static size_t Remove(std::vector<Node> &bg, const std::vector<Node> &fg,
const ServerId& server, bool *executed);
HashFunc _hash;
size_t _num_replicas;
ConsistentHashingLoadBalancerType _type;
butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
};
......
......@@ -159,7 +159,7 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return EHOSTDOWN;
}
DynPartLoadBalancer* DynPartLoadBalancer::New() const {
DynPartLoadBalancer* DynPartLoadBalancer::New(const butil::StringPiece&) const {
return new (std::nothrow) DynPartLoadBalancer;
}
......
......@@ -36,7 +36,7 @@ public:
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
DynPartLoadBalancer* New() const;
DynPartLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
......
......@@ -23,12 +23,16 @@
namespace brpc {
namespace policy {
uint32_t MD5Hash32(const void* key, size_t len) {
void MD5HashSignature(const void* key, size_t len, unsigned char* results) {
MD5_CTX my_md5;
MD5_Init(&my_md5);
MD5_Update(&my_md5, (const unsigned char *)key, len);
unsigned char results[16];
MD5_Final(results, &my_md5);
}
uint32_t MD5Hash32(const void* key, size_t len) {
unsigned char results[16];
MD5HashSignature(key, len, results);
return ((uint32_t) (results[3] & 0xFF) << 24)
| ((uint32_t) (results[2] & 0xFF) << 16)
| ((uint32_t) (results[1] & 0xFF) << 8)
......@@ -147,7 +151,7 @@ uint32_t CRCHash32(const void* key, size_t len) {
return ((~crc) >> 16) & 0x7fff;
}
const char *GetHashName(uint32_t (*hasher)(const void* key, size_t len)) {
const char *GetHashName(HashFunc hasher) {
if (hasher == MurmurHash32) {
return "murmurhash3";
}
......@@ -157,6 +161,7 @@ const char *GetHashName(uint32_t (*hasher)(const void* key, size_t len)) {
if (hasher == CRCHash32) {
return "crc32";
}
return "user_defined";
}
......
......@@ -25,6 +25,9 @@
namespace brpc {
namespace policy {
using HashFunc = uint32_t(*)(const void*, size_t);
void MD5HashSignature(const void* key, size_t len, unsigned char* results);
uint32_t MD5Hash32(const void* key, size_t len);
uint32_t MD5Hash32V(const butil::StringPiece* keys, size_t num_keys);
......
......@@ -460,7 +460,8 @@ int64_t LocalityAwareLoadBalancer::Weight::Update(
return ResetWeight(index, end_time_us);
}
LocalityAwareLoadBalancer* LocalityAwareLoadBalancer::New() const {
LocalityAwareLoadBalancer* LocalityAwareLoadBalancer::New(
const butil::StringPiece&) const {
return new (std::nothrow) LocalityAwareLoadBalancer;
}
......
......@@ -44,7 +44,7 @@ public:
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
LocalityAwareLoadBalancer* New() const;
LocalityAwareLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
int SelectServer(const SelectIn& in, SelectOut* out);
void Feedback(const CallInfo& info);
......
......@@ -134,7 +134,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return EHOSTDOWN;
}
RandomizedLoadBalancer* RandomizedLoadBalancer::New() const {
RandomizedLoadBalancer* RandomizedLoadBalancer::New(
const butil::StringPiece&) const {
return new (std::nothrow) RandomizedLoadBalancer;
}
......
......@@ -36,7 +36,7 @@ public:
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
RandomizedLoadBalancer* New() const;
RandomizedLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream& os, const DescribeOptions&);
......
......@@ -131,7 +131,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return EHOSTDOWN;
}
RoundRobinLoadBalancer* RoundRobinLoadBalancer::New() const {
RoundRobinLoadBalancer* RoundRobinLoadBalancer::New(
const butil::StringPiece&) const {
return new (std::nothrow) RoundRobinLoadBalancer;
}
......
......@@ -35,7 +35,7 @@ public:
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
RoundRobinLoadBalancer* New() const;
RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
......
......@@ -239,7 +239,8 @@ SocketId WeightedRoundRobinLoadBalancer::GetServerInNextStride(
return final_server;
}
LoadBalancer* WeightedRoundRobinLoadBalancer::New() const {
LoadBalancer* WeightedRoundRobinLoadBalancer::New(
const butil::StringPiece&) const {
return new (std::nothrow) WeightedRoundRobinLoadBalancer;
}
......
......@@ -35,7 +35,7 @@ public:
size_t AddServersInBatch(const std::vector<ServerId>& servers);
size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
int SelectServer(const SelectIn& in, SelectOut* out);
LoadBalancer* New() const;
LoadBalancer* New(const butil::StringPiece&) const;
void Destroy();
void Describe(std::ostream&, const DescribeOptions& options);
......
......@@ -303,6 +303,9 @@ public:
// Always succeed even if this socket is failed.
void ReAddress(SocketUniquePtr* ptr);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
// Mark this Socket or the Socket associated with `id' as failed.
// Any later Address() of the identifier shall return NULL unless the
// Socket was revivied by HealthCheckThread. The Socket is NOT recycled
......@@ -556,9 +559,6 @@ friend void DereferenceSocket(Socket*);
int ResetFileDescriptor(int fd);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
......
......@@ -26,6 +26,7 @@
namespace brpc {
namespace policy {
extern uint32_t CRCHash32(const char *key, size_t len);
extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
}}
namespace {
......@@ -251,8 +252,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
} else if (round == 3) {
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
} else {
lb = new brpc::policy::ConsistentHashingLoadBalancer(
::brpc::policy::MurmurHash32);
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
sa.hash = ::brpc::policy::MurmurHash32;
}
sa.lb = lb;
......@@ -392,8 +392,7 @@ TEST_F(LoadBalancerTest, fairness) {
} else if (3 == round || 4 == round) {
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
} else {
lb = new brpc::policy::ConsistentHashingLoadBalancer(
brpc::policy::MurmurHash32);
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::CONS_HASH_LB_MURMUR3);
sa.hash = brpc::policy::MurmurHash32;
}
sa.lb = lb;
......@@ -514,11 +513,19 @@ TEST_F(LoadBalancerTest, fairness) {
}
TEST_F(LoadBalancerTest, consistent_hashing) {
::brpc::policy::ConsistentHashingLoadBalancer::HashFunc hashs[] = {
::brpc::policy::HashFunc hashs[::brpc::policy::CONS_HASH_LB_LAST] = {
::brpc::policy::MurmurHash32,
::brpc::policy::MD5Hash32,
::brpc::policy::MD5Hash32
// ::brpc::policy::CRCHash32 crc is a bad hash function in test
};
::brpc::policy::ConsistentHashingLoadBalancerType hash_type[::brpc::policy::CONS_HASH_LB_LAST] = {
::brpc::policy::CONS_HASH_LB_MURMUR3,
::brpc::policy::CONS_HASH_LB_MD5,
::brpc::policy::CONS_HASH_LB_KETAMA
};
const char* servers[] = {
"10.92.115.19:8833",
"10.42.108.25:8833",
......@@ -527,7 +534,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
"10.42.122.201:8833",
};
for (size_t round = 0; round < ARRAY_SIZE(hashs); ++round) {
brpc::policy::ConsistentHashingLoadBalancer chlb(hashs[round]);
brpc::policy::ConsistentHashingLoadBalancer chlb(hash_type[round]);
std::vector<brpc::ServerId> ids;
std::vector<butil::EndPoint> addrs;
for (int j = 0;j < 5; ++j)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment