Commit d8fa3d51 authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: change delim order in Splitter & enhance UT & replace…

revived_from_all_failed: change delim order in Splitter & enhance UT & replace SplitStringIntoKeyValuePairs
parent 2c820f8f
...@@ -31,12 +31,12 @@ DEFINE_int64(detect_available_server_interval_ms, 10, "The interval " ...@@ -31,12 +31,12 @@ DEFINE_int64(detect_available_server_interval_ms, 10, "The interval "
"to detect available server count in DefaultClusterRecoverPolicy"); "to detect available server count in DefaultClusterRecoverPolicy");
DefaultClusterRecoverPolicy::DefaultClusterRecoverPolicy( DefaultClusterRecoverPolicy::DefaultClusterRecoverPolicy(
int64_t minimum_working_instances, int64_t hold_time_ms) int64_t min_working_instances, int64_t hold_seconds)
: _recovering(false) : _recovering(false)
, _minimum_working_instances(minimum_working_instances) , _min_working_instances(min_working_instances)
, _last_usable(0) , _last_usable(0)
, _last_usable_change_time_ms(0) , _last_usable_change_time_ms(0)
, _hold_time_ms(hold_time_ms) , _hold_seconds(hold_seconds)
, _usable_cache(0) , _usable_cache(0)
, _usable_cache_time_ms(0) { } , _usable_cache_time_ms(0) { }
...@@ -52,7 +52,7 @@ bool DefaultClusterRecoverPolicy::StopRecoverIfNecessary() { ...@@ -52,7 +52,7 @@ bool DefaultClusterRecoverPolicy::StopRecoverIfNecessary() {
int64_t now_ms = butil::gettimeofday_ms(); int64_t now_ms = butil::gettimeofday_ms();
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && _last_usable != 0 && if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_time_ms)) { (now_ms - _last_usable_change_time_ms > _hold_seconds)) {
_recovering = false; _recovering = false;
_last_usable = 0; _last_usable = 0;
_last_usable_change_time_ms = 0; _last_usable_change_time_ms = 0;
...@@ -99,7 +99,7 @@ bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_l ...@@ -99,7 +99,7 @@ bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_l
_last_usable_change_time_ms = now_ms; _last_usable_change_time_ms = now_ms;
} }
} }
if (butil::fast_rand_less_than(_minimum_working_instances) >= usable) { if (butil::fast_rand_less_than(_min_working_instances) >= usable) {
return true; return true;
} }
return false; return false;
...@@ -107,23 +107,23 @@ bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_l ...@@ -107,23 +107,23 @@ bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_l
bool GetRecoverPolicyByParams(const butil::StringPiece& params, bool GetRecoverPolicyByParams(const butil::StringPiece& params,
std::shared_ptr<ClusterRecoverPolicy>* ptr_out) { std::shared_ptr<ClusterRecoverPolicy>* ptr_out) {
int64_t minimum_working_instances = -1; int64_t min_working_instances = -1;
int64_t hold_time_ms = -1; int64_t hold_seconds = -1;
bool has_meet_params = false; bool has_meet_params = false;
for (butil::KeyValuePairsSplitter sp(params.begin(), params.end(), '=', ' '); for (butil::KeyValuePairsSplitter sp(params.begin(), params.end(), ' ', '=');
sp; ++sp) { sp; ++sp) {
if (sp.value().empty()) { if (sp.value().empty()) {
LOG(ERROR) << "Empty value for " << sp.key() << " in lb parameter"; LOG(ERROR) << "Empty value for " << sp.key() << " in lb parameter";
return false; return false;
} }
if (sp.key() == "minimum_working_instances") { if (sp.key() == "min_working_instances") {
if (!butil::StringToInt64(sp.value(), &minimum_working_instances)) { if (!butil::StringToInt64(sp.value(), &min_working_instances)) {
return false; return false;
} }
has_meet_params = true; has_meet_params = true;
continue; continue;
} else if (sp.key() == "hold_time_ms") { } else if (sp.key() == "hold_seconds") {
if (!butil::StringToInt64(sp.value(), &hold_time_ms)) { if (!butil::StringToInt64(sp.value(), &hold_seconds)) {
return false; return false;
} }
has_meet_params = true; has_meet_params = true;
...@@ -131,9 +131,9 @@ bool GetRecoverPolicyByParams(const butil::StringPiece& params, ...@@ -131,9 +131,9 @@ bool GetRecoverPolicyByParams(const butil::StringPiece& params,
} }
LOG(ERROR) << "Failed to set this unknown parameters " << sp.key_and_value(); LOG(ERROR) << "Failed to set this unknown parameters " << sp.key_and_value();
} }
if (minimum_working_instances > 0 && hold_time_ms > 0) { if (min_working_instances > 0 && hold_seconds > 0) {
ptr_out->reset( ptr_out->reset(
new DefaultClusterRecoverPolicy(minimum_working_instances, hold_time_ms)); new DefaultClusterRecoverPolicy(min_working_instances, hold_seconds));
} else if (has_meet_params) { } else if (has_meet_params) {
// In this case, user set some params but not in the right way, just return // In this case, user set some params but not in the right way, just return
// false to let user take care of this situation. // false to let user take care of this situation.
......
...@@ -53,11 +53,11 @@ public: ...@@ -53,11 +53,11 @@ public:
// If in recover state, the probability that a request is accepted is q/n, in // If in recover state, the probability that a request is accepted is q/n, in
// which q is the number of current available server, n is the number of minimum // which q is the number of current available server, n is the number of minimum
// working instances setting by user. If q is not changed during a given time, // working instances setting by user. If q is not changed during a given time,
// hold_time_ms, then the cluster is considered recovered and all the request // hold_seconds, then the cluster is considered recovered and all the request
// would be sent to the current available servers. // would be sent to the current available servers.
class DefaultClusterRecoverPolicy : public ClusterRecoverPolicy { class DefaultClusterRecoverPolicy : public ClusterRecoverPolicy {
public: public:
DefaultClusterRecoverPolicy(int64_t minimum_working_instances, int64_t hold_time_ms); DefaultClusterRecoverPolicy(int64_t min_working_instances, int64_t hold_seconds);
void StartRecover(); void StartRecover();
bool DoReject(const std::vector<ServerId>& server_list); bool DoReject(const std::vector<ServerId>& server_list);
...@@ -68,11 +68,11 @@ private: ...@@ -68,11 +68,11 @@ private:
private: private:
bool _recovering; bool _recovering;
int64_t _minimum_working_instances; int64_t _min_working_instances;
butil::Mutex _mutex; butil::Mutex _mutex;
uint64_t _last_usable; uint64_t _last_usable;
int64_t _last_usable_change_time_ms; int64_t _last_usable_change_time_ms;
int64_t _hold_time_ms; int64_t _hold_seconds;
uint64_t _usable_cache; uint64_t _usable_cache;
int64_t _usable_cache_time_ms; int64_t _usable_cache_time_ms;
}; };
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "butil/containers/flat_map.h" #include "butil/containers/flat_map.h"
#include "butil/errno.h" #include "butil/errno.h"
#include "butil/strings/string_number_conversions.h" #include "butil/strings/string_number_conversions.h"
#include "butil/strings/string_split.h"
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/policy/consistent_hashing_load_balancer.h" #include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h" #include "brpc/policy/hasher.h"
......
...@@ -797,10 +797,10 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { ...@@ -797,10 +797,10 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
int rand = butil::fast_rand_less_than(2); int rand = butil::fast_rand_less_than(2);
if (rand == 0) { if (rand == 0) {
brpc::policy::RandomizedLoadBalancer rlb; brpc::policy::RandomizedLoadBalancer rlb;
lb = rlb.New("minimum_working_instances=2 hold_time_ms=2000"); lb = rlb.New("min_working_instances=2 hold_seconds=2000");
} else if (rand == 1) { } else if (rand == 1) {
brpc::policy::RoundRobinLoadBalancer rrlb; brpc::policy::RoundRobinLoadBalancer rrlb;
lb = rrlb.New("minimum_working_instances=2 hold_time_ms=2000"); lb = rrlb.New("min_working_instances=2 hold_seconds=2000");
} }
brpc::SocketUniquePtr ptr[2]; brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) { for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
...@@ -846,7 +846,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { ...@@ -846,7 +846,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
} }
} }
ASSERT_TRUE(abs(num_ereject - num_ok) < 30); ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
bthread_usleep((2000 /* hold_time_ms */ + 10) * 1000); bthread_usleep((2000 /* hold_seconds */ + 10) * 1000);
// After enough waiting time, traffic should be sent to all available servers. // After enough waiting time, traffic should be sent to all available servers.
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
...@@ -901,6 +901,17 @@ public: ...@@ -901,6 +901,17 @@ public:
test::EchoResponse res; test::EchoResponse res;
}; };
TEST_F(LoadBalancerTest, invalid_lb_params) {
const char* lb_algo[] = { "random:mi_working_instances=2 hold_seconds=2000",
"rr:min_working_instances=2 hold_secon=2000" };
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), -1);
}
TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) { TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
...@@ -908,8 +919,8 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) { ...@@ -908,8 +919,8 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "3000"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "3000");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_min_isolation_duration_ms", "3000"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_min_isolation_duration_ms", "3000");
const char* lb_algo[] = { "random:minimum_working_instances=2 hold_time_ms=2000", const char* lb_algo[] = { "random:min_working_instances=2 hold_seconds=2000",
"rr:minimum_working_instances=2 hold_time_ms=2000" }; "rr:min_working_instances=2 hold_seconds=2000" };
brpc::Channel channel; brpc::Channel channel;
brpc::ChannelOptions options; brpc::ChannelOptions options;
options.protocol = "http"; options.protocol = "http";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment