Unverified Commit a12514ba authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #717 from zyearn/revive_from_circuit_breaker

Revive from all server failed
parents cde9a4a0 f3acf3d8
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <vector>
#include <gflags/gflags.h>
#include "brpc/cluster_recover_policy.h"
#include "butil/scoped_lock.h"
#include "butil/synchronization/lock.h"
#include "brpc/server_id.h"
#include "brpc/socket.h"
#include "butil/fast_rand.h"
#include "butil/time.h"
#include "butil/string_splitter.h"
namespace brpc {
DEFINE_int64(detect_available_server_interval_ms, 10, "The interval "
"to detect available server count in DefaultClusterRecoverPolicy");
DefaultClusterRecoverPolicy::DefaultClusterRecoverPolicy(
int64_t min_working_instances, int64_t hold_seconds)
: _recovering(false)
, _min_working_instances(min_working_instances)
, _last_usable(0)
, _last_usable_change_time_ms(0)
, _hold_seconds(hold_seconds)
, _usable_cache(0)
, _usable_cache_time_ms(0) { }
void DefaultClusterRecoverPolicy::StartRecover() {
std::unique_lock<butil::Mutex> mu(_mutex);
_recovering = true;
}
bool DefaultClusterRecoverPolicy::StopRecoverIfNecessary() {
if (!_recovering) {
return false;
}
int64_t now_ms = butil::gettimeofday_ms();
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_seconds)) {
_recovering = false;
_last_usable = 0;
_last_usable_change_time_ms = 0;
mu.unlock();
return false;
}
mu.unlock();
return true;
}
uint64_t DefaultClusterRecoverPolicy::GetUsableServerCount(
int64_t now_ms, const std::vector<ServerId>& server_list) {
if (now_ms - _usable_cache_time_ms < FLAGS_detect_available_server_interval_ms) {
return _usable_cache;
}
uint64_t usable = 0;
size_t n = server_list.size();
SocketUniquePtr ptr;
for (size_t i = 0; i < n; ++i) {
if (Socket::Address(server_list[i].id, &ptr) == 0
&& ptr->IsAvailable()) {
usable++;
}
}
{
std::unique_lock<butil::Mutex> mu(_mutex);
_usable_cache = usable;
_usable_cache_time_ms = now_ms;
}
return _usable_cache;
}
bool DefaultClusterRecoverPolicy::DoReject(const std::vector<ServerId>& server_list) {
if (!_recovering) {
return false;
}
int64_t now_ms = butil::gettimeofday_ms();
uint64_t usable = GetUsableServerCount(now_ms, server_list);
if (_last_usable != usable) {
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable != usable) {
_last_usable = usable;
_last_usable_change_time_ms = now_ms;
}
}
if (butil::fast_rand_less_than(_min_working_instances) >= usable) {
return true;
}
return false;
}
bool GetRecoverPolicyByParams(const butil::StringPiece& params,
std::shared_ptr<ClusterRecoverPolicy>* ptr_out) {
int64_t min_working_instances = -1;
int64_t hold_seconds = -1;
bool has_meet_params = false;
for (butil::KeyValuePairsSplitter sp(params.begin(), params.end(), ' ', '=');
sp; ++sp) {
if (sp.value().empty()) {
LOG(ERROR) << "Empty value for " << sp.key() << " in lb parameter";
return false;
}
if (sp.key() == "min_working_instances") {
if (!butil::StringToInt64(sp.value(), &min_working_instances)) {
return false;
}
has_meet_params = true;
continue;
} else if (sp.key() == "hold_seconds") {
if (!butil::StringToInt64(sp.value(), &hold_seconds)) {
return false;
}
has_meet_params = true;
continue;
}
LOG(ERROR) << "Failed to set this unknown parameters " << sp.key_and_value();
return false;
}
if (min_working_instances > 0 && hold_seconds > 0) {
ptr_out->reset(
new DefaultClusterRecoverPolicy(min_working_instances, hold_seconds));
} else if (has_meet_params) {
// In this case, user set some params but not in the right way, just return
// false to let user take care of this situation.
LOG(ERROR) << "Invalid params=`" << params << "'";
return false;
}
return true;
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_CLUSTER_RECOVER_POLICY
#define BRPC_CLUSTER_RECOVER_POLICY
#include <cstdint>
#include <memory>
#include "butil/synchronization/lock.h"
#include "butil/strings/string_piece.h"
#include "butil/strings/string_number_conversions.h"
namespace brpc {
class ServerId;
// After all servers are down and health check happens, servers are
// online one by one. Once one server is up, all the request that should
// be sent to all servers, would be sent to one server, which may be a
// disastrous behaviour. In the worst case it would cause the server being down
// again if circuit breaker is enabled and the cluster would never recover.
// This class controls the amount of requests that sent to the revived
// servers when recovering from all servers are down.
class ClusterRecoverPolicy {
public:
virtual ~ClusterRecoverPolicy() {}
// Indicate that recover from all server being down is happening.
virtual void StartRecover() = 0;
// Return true if some customized policies are satisfied.
virtual bool DoReject(const std::vector<ServerId>& server_list) = 0;
// Stop recover state and do not reject the request if some condition is
// satisfied. Return true if the current state is still in recovering.
virtual bool StopRecoverIfNecessary() = 0;
};
// The default cluster recover policy. Once no servers are available, recover is start.
// If in recover state, the probability that a request is accepted is q/n, in
// which q is the number of current available server, n is the number of minimum
// working instances setting by user. If q is not changed during a given time,
// hold_seconds, then the cluster is considered recovered and all the request
// would be sent to the current available servers.
class DefaultClusterRecoverPolicy : public ClusterRecoverPolicy {
public:
DefaultClusterRecoverPolicy(int64_t min_working_instances, int64_t hold_seconds);
void StartRecover();
bool DoReject(const std::vector<ServerId>& server_list);
bool StopRecoverIfNecessary();
private:
uint64_t GetUsableServerCount(int64_t now_ms, const std::vector<ServerId>& server_list);
private:
bool _recovering;
int64_t _min_working_instances;
butil::Mutex _mutex;
uint64_t _last_usable;
int64_t _last_usable_change_time_ms;
int64_t _hold_seconds;
uint64_t _usable_cache;
int64_t _usable_cache_time_ms;
};
// Return a DefaultClusterRecoverPolicy object by params.
bool GetRecoverPolicyByParams(const butil::StringPiece& params,
std::shared_ptr<ClusterRecoverPolicy>* ptr_out);
} // namespace brpc
#endif
...@@ -23,6 +23,7 @@ enum Errno { ...@@ -23,6 +23,7 @@ enum Errno {
EUNUSED = 1015; // The socket was not needed EUNUSED = 1015; // The socket was not needed
ESSL = 1016; // SSL related error ESSL = 1016; // SSL related error
EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams
EREJECT = 1018; // The Request is rejected
// Errno caused by server // Errno caused by server
EINTERNAL = 2001; // Internal Server Error EINTERNAL = 2001; // Internal Server Error
......
...@@ -24,9 +24,6 @@ ...@@ -24,9 +24,6 @@
#include "brpc/shared_object.h" // SharedObject #include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId #include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T> #include "brpc/extension.h" // Extension<T>
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
namespace brpc { namespace brpc {
......
...@@ -270,8 +270,7 @@ size_t ConsistentHashingLoadBalancer::RemoveServersInBatch( ...@@ -270,8 +270,7 @@ size_t ConsistentHashingLoadBalancer::RemoveServersInBatch(
return n; return n;
} }
LoadBalancer *ConsistentHashingLoadBalancer::New( LoadBalancer *ConsistentHashingLoadBalancer::New(const butil::StringPiece& params) const {
const butil::StringPiece& params) const {
ConsistentHashingLoadBalancer* lb = ConsistentHashingLoadBalancer* lb =
new (std::nothrow) ConsistentHashingLoadBalancer(_type); new (std::nothrow) ConsistentHashingLoadBalancer(_type);
if (lb && !lb->SetParameters(params)) { if (lb && !lb->SetParameters(params)) {
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include "brpc/reloadable_flags.h" #include "brpc/reloadable_flags.h"
#include "brpc/policy/locality_aware_load_balancer.h" #include "brpc/policy/locality_aware_load_balancer.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -270,7 +269,6 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -270,7 +269,6 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
size_t ntry = 0; size_t ntry = 0;
size_t nloop = 0; size_t nloop = 0;
int64_t total = _total.load(butil::memory_order_relaxed); int64_t total = _total.load(butil::memory_order_relaxed);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "butil/fast_rand.h" #include "butil/fast_rand.h"
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/policy/randomized_load_balancer.h" #include "brpc/policy/randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -110,7 +110,11 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -110,7 +110,11 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
if (_cluster_recover_policy && _cluster_recover_policy->StopRecoverIfNecessary()) {
if (_cluster_recover_policy->DoReject(s->server_list)) {
return EREJECT;
}
}
uint32_t stride = 0; uint32_t stride = 0;
size_t offset = butil::fast_rand_less_than(n); size_t offset = butil::fast_rand_less_than(n);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
...@@ -129,14 +133,22 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -129,14 +133,22 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
// this failed server won't be visited again inside for // this failed server won't be visited again inside for
offset = (offset + stride) % n; offset = (offset + stride) % n;
} }
if (_cluster_recover_policy) {
_cluster_recover_policy->StartRecover();
}
// After we traversed the whole server list, there is still no // After we traversed the whole server list, there is still no
// available server // available server
return EHOSTDOWN; return EHOSTDOWN;
} }
RandomizedLoadBalancer* RandomizedLoadBalancer::New( RandomizedLoadBalancer* RandomizedLoadBalancer::New(
const butil::StringPiece&) const { const butil::StringPiece& params) const {
return new (std::nothrow) RandomizedLoadBalancer; RandomizedLoadBalancer* lb = new (std::nothrow) RandomizedLoadBalancer;
if (lb && !lb->SetParameters(params)) {
delete lb;
lb = NULL;
}
return lb;
} }
void RandomizedLoadBalancer::Destroy() { void RandomizedLoadBalancer::Destroy() {
...@@ -162,5 +174,9 @@ void RandomizedLoadBalancer::Describe( ...@@ -162,5 +174,9 @@ void RandomizedLoadBalancer::Describe(
os << '}'; os << '}';
} }
bool RandomizedLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRecoverPolicyByParams(params, &_cluster_recover_policy);
}
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/cluster_recover_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -45,12 +45,14 @@ private: ...@@ -45,12 +45,14 @@ private:
std::vector<ServerId> server_list; std::vector<ServerId> server_list;
std::map<ServerId, size_t> server_map; std::map<ServerId, size_t> server_map;
}; };
bool SetParameters(const butil::StringPiece& params);
static bool Add(Servers& bg, const ServerId& id); static bool Add(Servers& bg, const ServerId& id);
static bool Remove(Servers& bg, const ServerId& id); static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers> _db_servers; butil::DoublyBufferedData<Servers> _db_servers;
std::shared_ptr<ClusterRecoverPolicy> _cluster_recover_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -110,6 +110,11 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -110,6 +110,11 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
if (_cluster_recover_policy && _cluster_recover_policy->StopRecoverIfNecessary()) {
if (_cluster_recover_policy->DoReject(s->server_list)) {
return EREJECT;
}
}
TLS tls = s.tls(); TLS tls = s.tls();
if (tls.stride == 0) { if (tls.stride == 0) {
tls.stride = GenRandomStride(); tls.stride = GenRandomStride();
...@@ -127,13 +132,21 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -127,13 +132,21 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0; return 0;
} }
} }
if (_cluster_recover_policy) {
_cluster_recover_policy->StartRecover();
}
s.tls() = tls; s.tls() = tls;
return EHOSTDOWN; return EHOSTDOWN;
} }
RoundRobinLoadBalancer* RoundRobinLoadBalancer::New( RoundRobinLoadBalancer* RoundRobinLoadBalancer::New(
const butil::StringPiece&) const { const butil::StringPiece& params) const {
return new (std::nothrow) RoundRobinLoadBalancer; RoundRobinLoadBalancer* lb = new (std::nothrow) RoundRobinLoadBalancer;
if (lb && !lb->SetParameters(params)) {
delete lb;
lb = NULL;
}
return lb;
} }
void RoundRobinLoadBalancer::Destroy() { void RoundRobinLoadBalancer::Destroy() {
...@@ -159,5 +172,9 @@ void RoundRobinLoadBalancer::Describe( ...@@ -159,5 +172,9 @@ void RoundRobinLoadBalancer::Describe(
os << '}'; os << '}';
} }
bool RoundRobinLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRecoverPolicyByParams(params, &_cluster_recover_policy);
}
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/cluster_recover_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -49,12 +49,14 @@ private: ...@@ -49,12 +49,14 @@ private:
uint32_t stride; uint32_t stride;
uint32_t offset; uint32_t offset;
}; };
bool SetParameters(const butil::StringPiece& params);
static bool Add(Servers& bg, const ServerId& id); static bool Add(Servers& bg, const ServerId& id);
static bool Remove(Servers& bg, const ServerId& id); static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers, TLS> _db_servers; butil::DoublyBufferedData<Servers, TLS> _db_servers;
std::shared_ptr<ClusterRecoverPolicy> _cluster_recover_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -22,8 +22,15 @@ ...@@ -22,8 +22,15 @@
#include "brpc/policy/locality_aware_load_balancer.h" #include "brpc/policy/locality_aware_load_balancer.h"
#include "brpc/policy/consistent_hashing_load_balancer.h" #include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h" #include "brpc/policy/hasher.h"
#include "brpc/errno.pb.h"
#include "echo.pb.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
namespace brpc { namespace brpc {
DECLARE_int32(health_check_interval);
DECLARE_int64(detect_available_server_interval_ms);
namespace policy { namespace policy {
extern uint32_t CRCHash32(const char *key, size_t len); extern uint32_t CRCHash32(const char *key, size_t len);
extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len)); extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
...@@ -708,7 +715,6 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) { ...@@ -708,7 +715,6 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
"10.92.115.19:8832", "10.92.115.19:8832",
"10.42.122.201:8833", "10.42.122.201:8833",
}; };
std::vector<brpc::LoadBalancer*> lbs; std::vector<brpc::LoadBalancer*> lbs;
lbs.push_back(new brpc::policy::RoundRobinLoadBalancer); lbs.push_back(new brpc::policy::RoundRobinLoadBalancer);
lbs.push_back(new brpc::policy::RandomizedLoadBalancer); lbs.push_back(new brpc::policy::RandomizedLoadBalancer);
...@@ -782,4 +788,203 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) { ...@@ -782,4 +788,203 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
} }
} }
TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
const char* servers[] = {
"10.92.115.19:8832",
"10.42.122.201:8833",
};
brpc::LoadBalancer* lb = NULL;
int rand = butil::fast_rand_less_than(2);
if (rand == 0) {
brpc::policy::RandomizedLoadBalancer rlb;
lb = rlb.New("min_working_instances=2 hold_seconds=2000");
} else if (rand == 1) {
brpc::policy::RoundRobinLoadBalancer rrlb;
lb = rrlb.New("min_working_instances=2 hold_seconds=2000");
}
brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
brpc::SocketOptions options;
options.remote_side = dummy;
brpc::ServerId id(8888);
id.tag = "50";
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr[i]));
lb->AddServer(id);
}
brpc::SocketUniquePtr sptr;
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&sptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
ptr[0]->SetFailed();
ptr[1]->SetFailed();
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
// should reject all request since there is no available server
for (int i = 0; i < 10; ++i) {
ASSERT_EQ(brpc::EREJECT, lb->SelectServer(in, &out));
}
{
brpc::SocketUniquePtr dummy_ptr;
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr));
dummy_ptr->Revive();
}
bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
// After one server is revived, the reject rate should be 50%
int num_ereject = 0;
int num_ok = 0;
for (int i = 0; i < 100; ++i) {
int rc = lb->SelectServer(in, &out);
if (rc == brpc::EREJECT) {
num_ereject++;
} else if (rc == 0) {
num_ok++;
} else {
ASSERT_TRUE(false);
}
}
ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
bthread_usleep((2000 /* hold_seconds */ + 10) * 1000);
// After enough waiting time, traffic should be sent to all available servers.
for (int i = 0; i < 10; ++i) {
ASSERT_EQ(0, lb->SelectServer(in, &out));
}
}
class EchoServiceImpl : public test::EchoService {
public:
EchoServiceImpl()
: _num_request(0) {}
virtual ~EchoServiceImpl() {}
virtual void Echo(google::protobuf::RpcController* cntl_base,
const test::EchoRequest* req,
test::EchoResponse* res,
google::protobuf::Closure* done) {
//brpc::Controller* cntl =
// static_cast<brpc::Controller*>(cntl_base);
brpc::ClosureGuard done_guard(done);
int p = _num_request.fetch_add(1, butil::memory_order_relaxed);
// concurrency in normal case is 50
if (p < 70) {
bthread_usleep(100 * 1000);
_num_request.fetch_sub(1, butil::memory_order_relaxed);
res->set_message("OK");
} else {
_num_request.fetch_sub(1, butil::memory_order_relaxed);
bthread_usleep(1000 * 1000);
}
return;
}
butil::atomic<int> _num_request;
};
butil::atomic<int32_t> num_failed(0);
butil::atomic<int32_t> num_reject(0);
class Done : public google::protobuf::Closure {
public:
void Run() {
if (cntl.Failed()) {
num_failed.fetch_add(1, butil::memory_order_relaxed);
if (cntl.ErrorCode() == brpc::EREJECT) {
num_reject.fetch_add(1, butil::memory_order_relaxed);
}
}
delete this;
}
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
};
TEST_F(LoadBalancerTest, invalid_lb_params) {
const char* lb_algo[] = { "random:mi_working_instances=2 hold_seconds=2000",
"rr:min_working_instances=2 hold_secon=2000" };
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), -1);
}
TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
// Those two lines force the interval of first hc to 3s
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "3000");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_min_isolation_duration_ms", "3000");
const char* lb_algo[] = { "random:min_working_instances=2 hold_seconds=2000",
"rr:min_working_instances=2 hold_seconds=2000" };
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
options.timeout_ms = 300;
options.enable_circuit_breaker = true;
// Disable retry to make health check happen one by one
options.max_retry = 0;
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), 0);
test::EchoRequest req;
req.set_message("123");
test::EchoResponse res;
test::EchoService_Stub stub(&channel);
{
// trigger one server to health check
brpc::Controller cntl;
stub.Echo(&cntl, &req, &res, NULL);
}
// This sleep make one server revived 700ms earlier than the other server, which
// can make the server down again if no request limit policy are applied here.
bthread_usleep(700000);
{
// trigger the other server to health check
brpc::Controller cntl;
stub.Echo(&cntl, &req, &res, NULL);
}
butil::EndPoint point(butil::IP_ANY, 7777);
brpc::Server server;
EchoServiceImpl service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(point, NULL));
butil::EndPoint point2(butil::IP_ANY, 7778);
brpc::Server server2;
EchoServiceImpl service2;
ASSERT_EQ(0, server2.AddService(&service2, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server2.Start(point2, NULL));
int64_t start_ms = butil::gettimeofday_ms();
while ((butil::gettimeofday_ms() - start_ms) < 3500) {
Done* done = new Done;
done->req.set_message("123");
stub.Echo(&done->cntl, &done->req, &done->res, done);
bthread_usleep(1000);
}
// All error code should be equal to EREJECT, except when the situation
// all servers are down, the very first call that trigger recovering would
// fail with EHOSTDOWN instead of EREJECT. This is where the number 1 comes
// in following ASSERT.
ASSERT_TRUE(num_failed.load(butil::memory_order_relaxed) -
num_reject.load(butil::memory_order_relaxed) == 1);
num_failed.store(0, butil::memory_order_relaxed);
// should recover now
for (int i = 0; i < 1000; ++i) {
Done* done = new Done;
done->req.set_message("123");
stub.Echo(&done->cntl, &done->req, &done->res, done);
bthread_usleep(1000);
}
bthread_usleep(500000 /* sleep longer than timeout of channel */);
ASSERT_EQ(0, num_failed.load(butil::memory_order_relaxed));
}
} //namespace } //namespace
...@@ -475,54 +475,3 @@ TEST(URITest, query_remover_key_value_not_changed_after_modified_query) { ...@@ -475,54 +475,3 @@ TEST(URITest, query_remover_key_value_not_changed_after_modified_query) {
ASSERT_EQ(qr.value(), "value2"); ASSERT_EQ(qr.value(), "value2");
} }
TEST(URITest, query_splitter_sanity) {
std::string query = "key1=value1&key2=value2&key3=value3";
{
brpc::QuerySplitter qs(query);
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
{
brpc::QuerySplitter qs(query.data(), query.data() + query.size());
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
{
brpc::QuerySplitter qs(query.c_str());
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment