Commit 2b550fd0 authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: remove RevivePolicy to ClusterRecoverPolicy

parent a1c05b58
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include <vector> #include <vector>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "brpc/revive_policy.h" #include "brpc/cluster_recover_policy.h"
#include "butil/scoped_lock.h" #include "butil/scoped_lock.h"
#include "butil/synchronization/lock.h" #include "butil/synchronization/lock.h"
#include "brpc/server_id.h" #include "brpc/server_id.h"
...@@ -28,11 +28,11 @@ ...@@ -28,11 +28,11 @@
namespace brpc { namespace brpc {
DEFINE_int64(detect_available_server_interval_ms, 10, "The interval " DEFINE_int64(detect_available_server_interval_ms, 10, "The interval "
"to detect available server count in DefaultRevivePolicy"); "to detect available server count in DefaultClusterRecoverPolicy");
DefaultRevivePolicy::DefaultRevivePolicy( DefaultClusterRecoverPolicy::DefaultClusterRecoverPolicy(
int64_t minimum_working_instances, int64_t hold_time_ms) int64_t minimum_working_instances, int64_t hold_time_ms)
: _reviving(false) : _recovering(false)
, _minimum_working_instances(minimum_working_instances) , _minimum_working_instances(minimum_working_instances)
, _last_usable(0) , _last_usable(0)
, _last_usable_change_time_ms(0) , _last_usable_change_time_ms(0)
...@@ -40,20 +40,20 @@ DefaultRevivePolicy::DefaultRevivePolicy( ...@@ -40,20 +40,20 @@ DefaultRevivePolicy::DefaultRevivePolicy(
, _usable_cache(0) , _usable_cache(0)
, _usable_cache_time_ms(0) { } , _usable_cache_time_ms(0) { }
void DefaultRevivePolicy::StartReviving() { void DefaultClusterRecoverPolicy::StartRecover() {
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
_reviving = true; _recovering = true;
} }
bool DefaultRevivePolicy::StopRevivingIfNecessary() { bool DefaultClusterRecoverPolicy::StopRecoverIfNecessary() {
if (!_reviving) { if (!_recovering) {
return false; return false;
} }
int64_t now_ms = butil::gettimeofday_ms(); int64_t now_ms = butil::gettimeofday_ms();
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && _last_usable != 0 && if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_time_ms)) { (now_ms - _last_usable_change_time_ms > _hold_time_ms)) {
_reviving = false; _recovering = false;
_last_usable_change_time_ms = 0; _last_usable_change_time_ms = 0;
mu.unlock(); mu.unlock();
return false; return false;
...@@ -62,7 +62,7 @@ bool DefaultRevivePolicy::StopRevivingIfNecessary() { ...@@ -62,7 +62,7 @@ bool DefaultRevivePolicy::StopRevivingIfNecessary() {
return true; return true;
} }
int DefaultRevivePolicy::GetUsableServerCount( int DefaultClusterRecoverPolicy::GetUsableServerCount(
int64_t now_ms, const std::vector<ServerId>& server_list) { int64_t now_ms, const std::vector<ServerId>& server_list) {
if (now_ms - _usable_cache_time_ms < FLAGS_detect_available_server_interval_ms) { if (now_ms - _usable_cache_time_ms < FLAGS_detect_available_server_interval_ms) {
return _usable_cache; return _usable_cache;
...@@ -85,8 +85,8 @@ int DefaultRevivePolicy::GetUsableServerCount( ...@@ -85,8 +85,8 @@ int DefaultRevivePolicy::GetUsableServerCount(
} }
bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) { bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_list) {
if (!_reviving) { if (!_recovering) {
return false; return false;
} }
int64_t now_ms = butil::gettimeofday_ms(); int64_t now_ms = butil::gettimeofday_ms();
...@@ -105,8 +105,8 @@ bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) { ...@@ -105,8 +105,8 @@ bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) {
return false; return false;
} }
bool GetRevivePolicyByParams(const butil::StringPiece& params, bool GetRecoverPolicyByParams(const butil::StringPiece& params,
std::shared_ptr<RevivePolicy>* ptr_out) { std::shared_ptr<ClusterRecoverPolicy>* ptr_out) {
int64_t minimum_working_instances = -1; int64_t minimum_working_instances = -1;
int64_t hold_time_ms = -1; int64_t hold_time_ms = -1;
bool has_meet_params = false; bool has_meet_params = false;
...@@ -133,7 +133,7 @@ bool GetRevivePolicyByParams(const butil::StringPiece& params, ...@@ -133,7 +133,7 @@ bool GetRevivePolicyByParams(const butil::StringPiece& params,
} }
if (minimum_working_instances > 0 && hold_time_ms > 0) { if (minimum_working_instances > 0 && hold_time_ms > 0) {
ptr_out->reset( ptr_out->reset(
new DefaultRevivePolicy(minimum_working_instances, hold_time_ms)); new DefaultClusterRecoverPolicy(minimum_working_instances, hold_time_ms));
} else if (has_meet_params) { } else if (has_meet_params) {
// In this case, user set some params but not in the right way, just return // In this case, user set some params but not in the right way, just return
// false to let user take care of this situation. // false to let user take care of this situation.
...@@ -143,5 +143,4 @@ bool GetRevivePolicyByParams(const butil::StringPiece& params, ...@@ -143,5 +143,4 @@ bool GetRevivePolicyByParams(const butil::StringPiece& params,
return true; return true;
} }
} // namespace brpc } // namespace brpc
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
// Authors: Jiashun Zhu(zhujiashun@bilibili.com) // Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_REVIVE_POLICY #ifndef BRPC_CLUSTER_RECOVER_POLICY
#define BRPC_REVIVE_POLICY #define BRPC_CLUSTER_RECOVER_POLICY
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
...@@ -27,46 +27,45 @@ namespace brpc { ...@@ -27,46 +27,45 @@ namespace brpc {
class ServerId; class ServerId;
// After all servers are shutdown and health check happens, servers are // After all servers are down and health check happens, servers are
// online one by one. Once one server is up, all the request that should // online one by one. Once one server is up, all the request that should
// be sent to all servers, would be sent to one server, which may be a // be sent to all servers, would be sent to one server, which may be a
// disastrous behaviour. In the worst case it would cause the server shutdown // disastrous behaviour. In the worst case it would cause the server being down
// again if circuit breaker is enabled and the server cluster would never // again if circuit breaker is enabled and the cluster would never recover.
// recover. This class controls the amount of requests that sent to the revived // This class controls the amount of requests that sent to the revived
// servers when recovering from all servers are shutdown. // servers when recovering from all servers are down.
class RevivePolicy { class ClusterRecoverPolicy {
public: public:
// Indicate that reviving from the shutdown of all server is happening. // Indicate that recover from all server being down is happening.
virtual void StartReviving() = 0; virtual void StartRecover() = 0;
// Return true if some customized policies are satisfied. // Return true if some customized policies are satisfied.
virtual bool DoReject(const std::vector<ServerId>& server_list) = 0; virtual bool DoReject(const std::vector<ServerId>& server_list) = 0;
// Stop reviving state and do not reject the request if some condition is // Stop recover state and do not reject the request if some condition is
// satisfied. // satisfied. Return true if the current state is still in recovering.
// Return true if the current state is still in reviving. virtual bool StopRecoverIfNecessary() = 0;
virtual bool StopRevivingIfNecessary() = 0;
}; };
// The default revive policy. Once no servers are available, reviving is start. // The default cluster recover policy. Once no servers are available, recover is start.
// If in reviving state, the probability that a request is accepted is q/n, in // If in recover state, the probability that a request is accepted is q/n, in
// which q is the number of current available server, n is the number of minimum // which q is the number of current available server, n is the number of minimum
// working instances setting by user. If q is not changed during a given time, // working instances setting by user. If q is not changed during a given time,
// hold_time_ms, then the cluster is considered recovered and all the request // hold_time_ms, then the cluster is considered recovered and all the request
// would be sent to the current available servers. // would be sent to the current available servers.
class DefaultRevivePolicy : public RevivePolicy { class DefaultClusterRecoverPolicy : public ClusterRecoverPolicy {
public: public:
DefaultRevivePolicy(int64_t minimum_working_instances, int64_t hold_time_ms); DefaultClusterRecoverPolicy(int64_t minimum_working_instances, int64_t hold_time_ms);
void StartReviving(); void StartRecover();
bool DoReject(const std::vector<ServerId>& server_list); bool DoReject(const std::vector<ServerId>& server_list);
bool StopRevivingIfNecessary(); bool StopRecoverIfNecessary();
private: private:
int GetUsableServerCount(int64_t now_ms, const std::vector<ServerId>& server_list); int GetUsableServerCount(int64_t now_ms, const std::vector<ServerId>& server_list);
private: private:
bool _reviving; bool _recovering;
int64_t _minimum_working_instances; int64_t _minimum_working_instances;
butil::Mutex _mutex; butil::Mutex _mutex;
int64_t _last_usable; int64_t _last_usable;
...@@ -76,10 +75,10 @@ private: ...@@ -76,10 +75,10 @@ private:
int64_t _usable_cache_time_ms; int64_t _usable_cache_time_ms;
}; };
// Return a DefaultRevivePolicy object by params. The caller is responsible // Return a DefaultClusterRecoverPolicy object by params. The caller is responsible
// for memory management of the return value. // for memory management of the return value.
bool GetRevivePolicyByParams(const butil::StringPiece& params, bool GetRecoverPolicyByParams(const butil::StringPiece& params,
std::shared_ptr<RevivePolicy>* ptr_out); std::shared_ptr<ClusterRecoverPolicy>* ptr_out);
} // namespace brpc } // namespace brpc
......
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/reloadable_flags.h" #include "brpc/reloadable_flags.h"
#include "brpc/policy/locality_aware_load_balancer.h" #include "brpc/policy/locality_aware_load_balancer.h"
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
......
...@@ -32,7 +32,7 @@ inline uint32_t GenRandomStride() { ...@@ -32,7 +32,7 @@ inline uint32_t GenRandomStride() {
} }
RandomizedLoadBalancer::RandomizedLoadBalancer() RandomizedLoadBalancer::RandomizedLoadBalancer()
: _revive_policy(NULL) : _cluster_recover_policy(NULL)
{} {}
bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
...@@ -114,8 +114,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -114,8 +114,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
if (_revive_policy && _revive_policy->StopRevivingIfNecessary()) { if (_cluster_recover_policy && _cluster_recover_policy->StopRecoverIfNecessary()) {
if (_revive_policy->DoReject(s->server_list)) { if (_cluster_recover_policy->DoReject(s->server_list)) {
return EREJECT; return EREJECT;
} }
} }
...@@ -137,8 +137,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -137,8 +137,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
// this failed server won't be visited again inside for // this failed server won't be visited again inside for
offset = (offset + stride) % n; offset = (offset + stride) % n;
} }
if (_revive_policy) { if (_cluster_recover_policy) {
_revive_policy->StartReviving(); _cluster_recover_policy->StartRecover();
} }
// After we traversed the whole server list, there is still no // After we traversed the whole server list, there is still no
// available server // available server
...@@ -179,7 +179,7 @@ void RandomizedLoadBalancer::Describe( ...@@ -179,7 +179,7 @@ void RandomizedLoadBalancer::Describe(
} }
bool RandomizedLoadBalancer::SetParameters(const butil::StringPiece& params) { bool RandomizedLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRevivePolicyByParams(params, &_revive_policy); return GetRecoverPolicyByParams(params, &_cluster_recover_policy);
} }
} // namespace policy } // namespace policy
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/revive_policy.h" #include "brpc/cluster_recover_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -53,7 +53,7 @@ private: ...@@ -53,7 +53,7 @@ private:
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers> _db_servers; butil::DoublyBufferedData<Servers> _db_servers;
std::shared_ptr<RevivePolicy> _revive_policy; std::shared_ptr<ClusterRecoverPolicy> _cluster_recover_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -110,8 +110,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -110,8 +110,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
if (_revive_policy && _revive_policy->StopRevivingIfNecessary()) { if (_cluster_recover_policy && _cluster_recover_policy->StopRecoverIfNecessary()) {
if (_revive_policy->DoReject(s->server_list)) { if (_cluster_recover_policy->DoReject(s->server_list)) {
return EREJECT; return EREJECT;
} }
} }
...@@ -132,8 +132,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -132,8 +132,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0; return 0;
} }
} }
if (_revive_policy) { if (_cluster_recover_policy) {
_revive_policy->StartReviving(); _cluster_recover_policy->StartRecover();
} }
s.tls() = tls; s.tls() = tls;
return EHOSTDOWN; return EHOSTDOWN;
...@@ -173,9 +173,8 @@ void RoundRobinLoadBalancer::Describe( ...@@ -173,9 +173,8 @@ void RoundRobinLoadBalancer::Describe(
} }
bool RoundRobinLoadBalancer::SetParameters(const butil::StringPiece& params) { bool RoundRobinLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRevivePolicyByParams(params, &_revive_policy); return GetRecoverPolicyByParams(params, &_cluster_recover_policy);
} }
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/revive_policy.h" #include "brpc/cluster_recover_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -56,7 +56,7 @@ private: ...@@ -56,7 +56,7 @@ private:
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers, TLS> _db_servers; butil::DoublyBufferedData<Servers, TLS> _db_servers;
std::shared_ptr<RevivePolicy> _revive_policy; std::shared_ptr<ClusterRecoverPolicy> _cluster_recover_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/policy/weighted_round_robin_load_balancer.h" #include "brpc/policy/weighted_round_robin_load_balancer.h"
#include "butil/strings/string_number_conversions.h" #include "butil/strings/string_number_conversions.h"
#include "brpc/revive_policy.h"
namespace { namespace {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment