Commit 37d2a3bd authored by cdjgit's avatar cdjgit

some enhancements

parent 39c4d729
......@@ -31,12 +31,43 @@ namespace policy {
DEFINE_int32(chash_num_replicas, 100,
"default number of replicas per server in chash");
namespace {
class ReplicaPolicy {
public:
ReplicaPolicy() : _hash_func(nullptr) {}
ReplicaPolicy(HashFunc hash) : _hash_func(hash) {}
bool BuildReplicasDefault(const ServerId server,
const size_t num_replicas,
HashFunc hash,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) {
virtual ~ReplicaPolicy();
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const = 0;
static const ReplicaPolicy* GetReplicaPolicy(const std::string& name) {
auto iter = _policy_map.find(name);
if (iter != _policy_map.end()) {
return iter->second;
}
return nullptr;
}
protected:
HashFunc _hash_func = nullptr;
private:
static const std::map<std::string, const ReplicaPolicy*> _policy_map;
};
class DefaultReplicaPolicy : public ReplicaPolicy {
public:
DefaultReplicaPolicy(HashFunc hash) : ReplicaPolicy(hash) {}
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;
};
bool DefaultReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
......@@ -47,7 +78,7 @@ bool BuildReplicasDefault(const ServerId server,
int len = snprintf(host, sizeof(host), "%s-%lu",
endpoint2str(ptr->remote_side()).c_str(), i);
ConsistentHashingLoadBalancer::Node node;
node.hash = hash(host, len);
node.hash = _hash_func(host, len);
node.server_sock = server;
node.server_addr = ptr->remote_side();
replicas->push_back(node);
......@@ -55,9 +86,16 @@ bool BuildReplicasDefault(const ServerId server,
return true;
}
bool BuildReplicasKetam(const ServerId server,
const size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) {
class KetamaReplicaPolicy : public ReplicaPolicy {
public:
virtual bool Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const;
};
bool KetamaReplicaPolicy::Build(ServerId server,
size_t num_replicas,
std::vector<ConsistentHashingLoadBalancer::Node>* replicas) const {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) {
return false;
......@@ -86,35 +124,17 @@ bool BuildReplicasKetam(const ServerId server,
return true;
}
} // namespace
const std::map<std::string, const ReplicaPolicy*> ReplicaPolicy::_policy_map = {
{"murmurhash3", new DefaultReplicaPolicy(MurmurHash32)},
{"md5", new DefaultReplicaPolicy(MD5Hash32)},
{"ketama", new KetamaReplicaPolicy}
};
ConsistentHashingLoadBalancer::ConsistentHashingLoadBalancer(const char* name)
: _num_replicas(FLAGS_chash_num_replicas), _name(name) {
Init(_name);
}
void ConsistentHashingLoadBalancer::Init(const std::string& name) {
if (name == "murmurhash3") {
_build_replicas = std::bind(BuildReplicasDefault,
std::placeholders::_1,
std::placeholders::_2,
MurmurHash32,
std::placeholders::_3);
return;
}
if (name == "md5") {
_build_replicas = std::bind(BuildReplicasDefault,
std::placeholders::_1,
std::placeholders::_2,
MD5Hash32,
std::placeholders::_3);
return;
}
if (name == "ketama") {
_build_replicas = BuildReplicasKetam;
return;
}
CHECK(false) << "Failed to init consistency hash load balancer of \'" << name << '\'';
_replicas_policy = ReplicaPolicy::GetReplicaPolicy(name);
CHECK(_replicas_policy)
<< "Fail to find replica policy for consistency lb: '" << name << '\'';
}
size_t ConsistentHashingLoadBalancer::AddBatch(
......@@ -188,7 +208,7 @@ size_t ConsistentHashingLoadBalancer::Remove(
bool ConsistentHashingLoadBalancer::AddServer(const ServerId& server) {
std::vector<Node> add_nodes;
add_nodes.reserve(_num_replicas);
if (!_build_replicas(server, _num_replicas, &add_nodes)) {
if (!_replicas_policy->Build(server, _num_replicas, &add_nodes)) {
return false;
}
std::sort(add_nodes.begin(), add_nodes.end());
......@@ -207,7 +227,7 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch(
replicas.reserve(_num_replicas);
for (size_t i = 0; i < servers.size(); ++i) {
replicas.clear();
if (_build_replicas(servers[i], _num_replicas, &replicas)) {
if (_replicas_policy->Build(servers[i], _num_replicas, &replicas)) {
add_nodes.insert(add_nodes.end(), replicas.begin(), replicas.end());
}
}
......
......@@ -28,6 +28,8 @@
namespace brpc {
namespace policy {
class ReplicaPolicy;
class ConsistentHashingLoadBalancer : public LoadBalancer {
public:
struct Node {
......@@ -43,8 +45,6 @@ public:
return hash < code;
}
};
using BuildReplicasFunc =
std::function<bool (const ServerId server, const size_t num_replicas, std::vector<Node>* replicas)>;
explicit ConsistentHashingLoadBalancer(const char* name);
bool AddServer(const ServerId& server);
bool RemoveServer(const ServerId& server);
......@@ -57,7 +57,6 @@ public:
virtual bool SetParameters(const butil::StringPiece& params);
private:
void Init(const std::string& name);
void GetLoads(std::map<butil::EndPoint, double> *load_map);
static size_t AddBatch(std::vector<Node> &bg, const std::vector<Node> &fg,
const std::vector<Node> &servers, bool *executed);
......@@ -65,7 +64,7 @@ private:
const std::vector<ServerId> &servers, bool *executed);
static size_t Remove(std::vector<Node> &bg, const std::vector<Node> &fg,
const ServerId& server, bool *executed);
BuildReplicasFunc _build_replicas;
const ReplicaPolicy* _replicas_policy;
size_t _num_replicas;
std::string _name;
butil::DoublyBufferedData<std::vector<Node> > _db_hash_ring;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment