Commit 50eed9b0 authored by cdjgit's avatar cdjgit

lb configurable && consistency lb refactor

parent c7cbfb37
......@@ -108,8 +108,9 @@ const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
, ch_md5_lb(MD5Hash32)
: ch_mh_lb("murmurhash3")
, ch_md5_lb("md5")
, ch_ketama_lb("ketama")
, constant_cl(0) {
}
......@@ -129,6 +130,7 @@ struct GlobalExtensions {
LocalityAwareLoadBalancer la_lb;
ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb;
ConsistentHashingLoadBalancer ch_ketama_lb;
DynPartLoadBalancer dynpart_lb;
AutoConcurrencyLimiter auto_cl;
......@@ -350,6 +352,7 @@ static void GlobalInitializeOrDieImpl() {
LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb);
LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
// Compress Handlers
......
......@@ -62,8 +62,11 @@ SharedLoadBalancer::~SharedLoadBalancer() {
}
}
int SharedLoadBalancer::Init(const char* lb_name) {
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name);
int SharedLoadBalancer::Init(const char* lb_protocol) {
std::string lb_name;
butil::StringPairs lb_parms;
ParseParameters(lb_protocol, &lb_name, &lb_parms);
const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name.c_str());
if (lb == NULL) {
LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'";
return -1;
......@@ -74,6 +77,10 @@ int SharedLoadBalancer::Init(const char* lb_name) {
return -1;
}
_lb = lb_copy;
if (!_lb->SetParameters(lb_parms)) {
LOG(FATAL) << "Fail to set parameters of lb `" << lb_protocol << "'";
return -1;
}
if (FLAGS_show_lb_in_vars && !_exposed) {
ExposeLB();
}
......@@ -89,4 +96,20 @@ void SharedLoadBalancer::Describe(std::ostream& os,
}
}
void SharedLoadBalancer::ParseParameters(const butil::StringPiece lb_protocol,
std::string* lb_name,
butil::StringPairs* parms) {
lb_name->clear();
parms->clear();
size_t pos = lb_protocol.find(':');
if (pos == std::string::npos) {
lb_name->append(lb_protocol.data(), lb_protocol.size());
} else {
lb_name->append(lb_protocol.data(), pos);
butil::StringPiece parms_piece = lb_protocol.substr(pos + sizeof(':'));
std::string parms_str(parms_piece.data(), parms_piece.size());
butil::SplitStringIntoKeyValuePairs(parms_str, '=', ' ', parms);
}
}
} // namespace brpc
......@@ -24,6 +24,8 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T>
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
namespace brpc {
......@@ -102,6 +104,8 @@ public:
// Caller is responsible for Destroy() the instance after usage.
virtual LoadBalancer* New() const = 0;
virtual bool SetParameters(const butil::StringPairs& parms) { return true; }
protected:
virtual ~LoadBalancer() { }
};
......@@ -164,6 +168,9 @@ public:
}
private:
static void ParseParameters(const butil::StringPiece lb_protocl,
std::string* lb_name,
butil::StringPairs* parms);
static void DescribeLB(std::ostream& os, void* arg);
void ExposeLB();
......
......@@ -18,8 +18,10 @@
#include <gflags/gflags.h>
#include "butil/containers/flat_map.h"
#include "butil/errno.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/socket.h"
#include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h"
namespace brpc {
......@@ -29,16 +31,92 @@ namespace policy {
DEFINE_int32(chash_num_replicas, 100,
"default number of replicas per server in chash");
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(HashFunc hash)
: _hash(hash)
, _num_replicas(FLAGS_chash_num_replicas) {
namespace {
using HashFun = uint32_t(*)(const void*, size_t);
bool BuildReplicasDefault(const ServerId server,
const size_t num_replicas,
HashFun hash,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
for (size_t i = 0; i < num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
ConsistentHashingLoadBalancer::Node node;
node.hash = hash(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
replicas->push_back(node);
}
return true;
}
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(
HashFunc hash,
size_t num_replicas)
: _hash(hash)
, _num_replicas(num_replicas) {
bool BuildReplicasKetam(const ServerId server,
const size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
}
replicas->clear();
const size_t points_per_hash = 4;
CHECK(num_replicas % points_per_hash == 0)
<< "Ketam hash replicas number(" << num_replicas << ") should be n*4";
for (size_t i = 0; i < num_replicas / points_per_hash; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
unsigned char digest[16];
MD5HashSignature(host, len, digest);
for (size_t j = 0; j < points_per_hash; ++j) {
ConsistentHashingLoadBalancer::Node node;
node.server_sock = server;
node.server_addr = ptr->remote_side();
node.hash = ((uint32_t) (digest[3 + j * 4] & 0xFF) << 24)
| ((uint32_t) (digest[2 + j * 4] & 0xFF) << 16)
| ((uint32_t) (digest[1 + j * 4] & 0xFF) << 8)
| (digest[0 + j * 4] & 0xFF);
replicas->push_back(node);
}
}
return true;
}
} // namespace
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(const char* name)
: _num_replicas(FLAGS_chash_num_replicas), _name(name) {
Init(_name);
}
void ConsistentHashingLoadBalancer::Init(const std::string& name) {
if (name.compare("murmurhash3") == 0) {
_build_replicas = std::bind(BuildReplicasDefault,
std::placeholders::_1,
std::placeholders::_2,
MurmurHash32,
std::placeholders::_3);
return;
}
if (name.compare("md5") == 0) {
_build_replicas = std::bind(BuildReplicasDefault,
std::placeholders::_1,
std::placeholders::_2,
MD5Hash32,
std::placeholders::_3);
return;
}
if (name.compare("ketama") == 0) {
_build_replicas = BuildReplicasKetam;
return;
}
CHECK(false) << "Failed to init consistency hash load balancer of \'" << name << '\'';
}
size_t ConsistentHashingLoadBalancer::AddBatch(
......@@ -112,20 +190,9 @@ size_t ConsistentHashingLoadBalancer::Remove(
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
std::vector<Node> add_nodes;
add_nodes.reserve(_num_replicas);
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
if (!_build_replicas(server, _num_replicas, &add_nodes)) {
return false;
}
for (size_t i = 0; i < _num_replicas; ++i) {
char host[32];
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
Node node;
node.hash = _hash(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
}
std::sort(add_nodes.begin(), add_nodes.end());
bool executed = false;
const size_t ret = _db_hash_ring.ModifyWithForeground(
......@@ -138,23 +205,12 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch(
const std::vector<ServerId> &servers) {
std::vector<Node> add_nodes;
add_nodes.reserve(servers.size() * _num_replicas);
std::vector<Node> replicas;
replicas.reserve(_num_replicas);
for (size_t i = 0; i < servers.size(); ++i) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(servers[i].id, &ptr) == -1) {
continue;
}
for (size_t rep = 0; rep < _num_replicas; ++rep) {
char host[32];
// To be compatible with libmemcached, we formulate the key of
// a virtual node as `|address|-|replica_index|', see
// http://fe.baidu.com/-1bszwnf at line 297.
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), rep);
Node node;
node.hash = _hash(host, len);
node.server_sock = servers[i];
node.server_addr = ptr->remote_side();
add_nodes.push_back(node);
replicas.clear();
if (_build_replicas(servers[i], _num_replicas, &replicas)) {
add_nodes.insert(add_nodes.end(), replicas.begin(), replicas.end());
}
}
std::sort(add_nodes.begin(), add_nodes.end());
......@@ -188,7 +244,7 @@ size_t ConsistentHashingLoadBalancer::RemoveServersInBatch(
}
LoadBalancer *ConsistentHashingLoadBalancer::New() const {
return new (std::nothrow) ConsistentHashingLoadBalancer(_hash);
return new (std::nothrow) ConsistentHashingLoadBalancer(_name.c_str());
}
void ConsistentHashingLoadBalancer::Destroy() {
......@@ -232,8 +288,6 @@ int ConsistentHashingLoadBalancer::SelectServer(
return EHOSTDOWN;
}
extern const char *GetHashName(uint32_t (*hasher)(const void* key, size_t len));
void ConsistentHashingLoadBalancer::Describe(
std::ostream &os, const DescribeOptions& options) {
if (!options.verbose) {
......@@ -241,7 +295,7 @@ void ConsistentHashingLoadBalancer::Describe(
return;
}
os << "ConsistentHashingLoadBalancer {\n"
<< " hash function: " << GetHashName(_hash) << '\n'
<< " hash function: " << _name << '\n'
<< " replica per host: " << _num_replicas << '\n';
std::map<butil::EndPoint, double> load_map;
GetLoads(&load_map);
......@@ -289,5 +343,22 @@ void ConsistentHashingLoadBalancer::GetLoads(
}
}
bool ConsistentHashingLoadBalancer::SetParameters(const butil::StringPairs& parms) {
for (const std::pair<std::string, std::string>& parm : parms) {
if (parm.first.compare("replicas") == 0) {
size_t replicas = 0;
if (butil::StringToSizeT(parm.second, &replicas)) {
_num_replicas = replicas;
} else {
return false;
}
} else {
return false;
}
}
return true;
}
} // namespace policy
} // namespace brpc
......@@ -18,6 +18,7 @@
#define BRPC_CONSISTENT_HASHING_LOAD_BALANCER_H
#include <stdint.h> // uint32_t
#include <functional>
#include <vector> // std::vector
#include "butil/endpoint.h" // butil::EndPoint
#include "butil/containers/doubly_buffered_data.h"
......@@ -29,20 +30,6 @@ namespace policy {
class ConsistentHashingLoadBalancer : public LoadBalancer {
public:
typedef uint32_t (*HashFunc)(const void* key, size_t len);
explicit ConsistentHashingLoadBalancer(HashFunc hash);
ConsistentHashingLoadBalancer(HashFunc hash, size_t num_replicas);
bool AddServer(const ServerId& server);
bool RemoveServer(const ServerId& server);
size_t AddServersInBatch(const std::vector<ServerId> &servers);
size_t RemoveServersInBatch(const std::vector<ServerId> &servers);
LoadBalancer *New() const;
void Destroy();
int SelectServer(const SelectIn &in, SelectOut *out);
void Describe(std::ostream &os, const DescribeOptions& options);
private:
void GetLoads(std::map<butil::EndPoint, double> *load_map);
struct Node {
uint32_t hash;
ServerId server_sock;
......@@ -56,14 +43,31 @@ private:
return hash < code;
}
};
using BuildReplicasFunc =
std::function<bool (const ServerId server, const size_t num_replicas, std::vector<Node>* replicas)>;
explicit ConsistentHashingLoadBalancer(const char* name);
bool AddServer(const ServerId& server);
bool RemoveServer(const ServerId& server);
size_t AddServersInBatch(const std::vector<ServerId> &servers);
size_t RemoveServersInBatch(const std::vector<ServerId> &servers);
LoadBalancer *New() const;
void Destroy();
int SelectServer(const SelectIn &in, SelectOut *out);
void Describe(std::ostream &os, const DescribeOptions& options);
virtual bool SetParameters(const butil::StringPairs& parms);
private:
void Init(const std::string& name);
void GetLoads(std::map<butil::EndPoint, double> *load_map);
static size_t AddBatch(std::vector<Node> &bg, const std::vector<Node> &fg,
const std::vector<Node> &servers, bool *executed);
static size_t RemoveBatch(std::vector<Node> &bg, const std::vector<Node> &fg,
const std::vector<ServerId> &servers, bool *executed);
static size_t Remove(std::vector<Node> &bg, const std::vector<Node> &fg,
const ServerId& server, bool *executed);
HashFunc _hash;
BuildReplicasFunc _build_replicas;
size_t _num_replicas;
std::string _name;
butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
};
......
......@@ -23,12 +23,16 @@
namespace brpc {
namespace policy {
uint32_t MD5Hash32(const void* key, size_t len) {
uint32_t MD5HashSignature(const void* key, size_t len, unsigned char* results) {
MD5_CTX my_md5;
MD5_Init(&my_md5);
MD5_Update(&my_md5, (const unsigned char *)key, len);
unsigned char results[16];
MD5_Final(results, &my_md5);
}
uint32_t MD5Hash32(const void* key, size_t len) {
unsigned char results[16];
MD5HashSignature(key, len, results);
return ((uint32_t) (results[3] & 0xFF) << 24)
| ((uint32_t) (results[2] & 0xFF) << 16)
| ((uint32_t) (results[1] & 0xFF) << 8)
......
......@@ -25,6 +25,7 @@
namespace brpc {
namespace policy {
uint32_t MD5HashSignature(const void* key, size_t len, unsigned char* results);
uint32_t MD5Hash32(const void* key, size_t len);
uint32_t MD5Hash32V(const butil::StringPiece* keys, size_t num_keys);
......
......@@ -303,6 +303,9 @@ public:
// Always succeed even if this socket is failed.
void ReAddress(SocketUniquePtr* ptr);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
// Mark this Socket or the Socket associated with `id' as failed.
// Any later Address() of the identifier shall return NULL unless the
// Socket was revivied by HealthCheckThread. The Socket is NOT recycled
......@@ -550,9 +553,6 @@ friend void DereferenceSocket(Socket*);
int ResetFileDescriptor(int fd);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment