Commit 593082c4 authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: refine comments and UT

parent 57b60aa4
......@@ -129,7 +129,10 @@ struct ChannelOptions {
// Default: ""
std::string connection_group;
// TODO(zhujiashun)
// Customize the revive policy after all servers are shutdown. The
// interface is defined in src/brpc/revive_policy.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
RevivePolicy* revive_policy;
......
......@@ -302,6 +302,20 @@ int ConsistentHashingLoadBalancer::SelectServer(
if (s->empty()) {
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
std::set<ServerId> server_list;
for (auto server: *s) {
server_list.insert(server.server_sock);
}
std::vector<ServerId> server_list_distinct(
server_list.begin(), server_list.end());
if (rp->DoReject(server_list_distinct)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
std::vector<Node>::const_iterator choice =
std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
if (choice == s->end()) {
......@@ -319,6 +333,9 @@ int ConsistentHashingLoadBalancer::SelectServer(
}
}
}
if (rp) {
rp->StartReviving();
}
return EHOSTDOWN;
}
......
......@@ -22,7 +22,7 @@
#include "brpc/socket.h"
#include "brpc/reloadable_flags.h"
#include "brpc/policy/locality_aware_load_balancer.h"
#include "brpc/revive_policy.h"
namespace brpc {
namespace policy {
......@@ -270,7 +270,6 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
if (n == 0) {
return ENODATA;
}
size_t ntry = 0;
size_t nloop = 0;
int64_t total = _total.load(butil::memory_order_relaxed);
......
......@@ -110,9 +110,12 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) {
return ENODATA;
}
if (in.revive_policy &&
(in.revive_policy)->RejectDuringReviving(s->server_list)) {
return EREJECT;
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp->DoReject(s->server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
uint32_t stride = 0;
size_t offset = butil::fast_rand_less_than(n);
......@@ -132,8 +135,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
// this failed server won't be visited again inside for
offset = (offset + stride) % n;
}
if (in.revive_policy) {
in.revive_policy->StartRevive();
if (rp) {
rp->StartReviving();
}
// After we traversed the whole server list, there is still no
// available server
......
......@@ -31,12 +31,6 @@ namespace policy {
// than RoundRobinLoadBalancer.
class RandomizedLoadBalancer : public LoadBalancer {
public:
RandomizedLoadBalancer()
: _reviving(false)
//TODO(zhujiashun)
, _minimum_working_instances(2)
, _last_usable(0)
, _last_usable_change_time_ms(0) {}
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
......@@ -57,13 +51,6 @@ private:
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers> _db_servers;
bool _reviving;
int64_t _minimum_working_instances;
// TODO(zhujiashun): remove mutex
butil::Mutex _mutex;
int64_t _last_usable;
int64_t _last_usable_change_time_ms;
};
} // namespace policy
......
......@@ -110,9 +110,12 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) {
return ENODATA;
}
if (in.revive_policy &&
(in.revive_policy)->RejectDuringReviving(s->server_list)) {
return EREJECT;
RevivePolicy* rp = in.revive_policy;
if (rp) {
if (rp->DoReject(s->server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
TLS tls = s.tls();
if (tls.stride == 0) {
......@@ -131,8 +134,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0;
}
}
if (in.revive_policy) {
in.revive_policy->StartRevive();
if (rp) {
rp->StartReviving();
}
s.tls() = tls;
return EHOSTDOWN;
......
......@@ -20,6 +20,7 @@
#include "brpc/socket.h"
#include "brpc/policy/weighted_round_robin_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/revive_policy.h"
namespace {
......@@ -157,6 +158,18 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (s->server_list.empty()) {
return ENODATA;
}
RevivePolicy* rp = in.revive_policy;
if (rp) {
std::vector<ServerId> server_list;
server_list.reserve(s->server_list.size());
for (auto server: s->server_list) {
server_list.emplace_back(server.id);
}
if (rp->DoReject(server_list)) {
return EREJECT;
}
rp->StopRevivingIfNecessary();
}
TLS& tls = s.tls();
if (tls.IsNeededCaculateNewStride(s->weight_sum, s->server_list.size())) {
if (tls.stride == 0) {
......@@ -198,6 +211,9 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
tls_temp.remain_server = tls.remain_server;
}
}
if (rp) {
rp->StartReviving();
}
return EHOSTDOWN;
}
......
......@@ -34,45 +34,55 @@ DefaultRevivePolicy::DefaultRevivePolicy(
, _hold_time_ms(hold_time_ms) { }
void DefaultRevivePolicy::StartRevive() {
void DefaultRevivePolicy::StartReviving() {
std::unique_lock<butil::Mutex> mu(_mutex);
_reviving = true;
}
bool DefaultRevivePolicy::RejectDuringReviving(
const std::vector<ServerId>& server_list) {
if (!_reviving) {
return false;
void DefaultRevivePolicy::StopRevivingIfNecessary() {
int64_t now_ms = butil::gettimeofday_ms();
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_time_ms)) {
_reviving = false;
_last_usable_change_time_ms = 0;
}
}
return;
}
bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) {
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (!_reviving) {
mu.unlock();
return false;
}
}
size_t n = server_list.size();
int usable = 0;
// TODO(zhujiashun): optimize looking process
SocketUniquePtr ptr;
// TODO(zhujiashun): optimize O(N)
for (size_t i = 0; i < n; ++i) {
if (Socket::Address(server_list[i].id, &ptr) == 0
&& !ptr->IsLogOff()) {
usable++;
}
}
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && usable != 0 &&
(butil::gettimeofday_ms() - _last_usable_change_time_ms > _hold_time_ms)
&& _last_usable == usable) {
_reviving = false;
_last_usable_change_time_ms = 0;
mu.unlock();
} else {
int64_t now_ms = butil::gettimeofday_ms();
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable != usable) {
_last_usable = usable;
_last_usable_change_time_ms = butil::gettimeofday_ms();
}
mu.unlock();
int rand = butil::fast_rand_less_than(_minimum_working_instances);
if (rand >= usable) {
return true;
_last_usable_change_time_ms = now_ms;
}
}
int rand = butil::fast_rand_less_than(_minimum_working_instances);
if (rand >= usable) {
return true;
}
return false;
}
} // namespace brpc
......@@ -23,20 +23,40 @@
namespace brpc {
class ServerId;
// After all servers are shutdown and health check happens, servers are
// online one by one. Once one server is up, all the request that should
// be sent to all servers, would be sent to one server, which may be a
// disastrous behaviour. In the worst case it would cause the server shutdown
// again if circuit breaker is enabled and the server cluster would never
// recover. This class controls the amount of requests that sent to the revived
// servers when recovering from all servers are shutdown.
class RevivePolicy {
public:
// TODO(zhujiashun):
// Indicate that reviving from the shutdown of all server is happening.
virtual void StartReviving() = 0;
// Return true if some customized policies are satisfied.
virtual bool DoReject(const std::vector<ServerId>& server_list) = 0;
virtual void StartRevive() = 0;
virtual bool RejectDuringReviving(const std::vector<ServerId>& server_list) = 0;
// Stop reviving state and do not reject the request if some condition is
// satisfied.
virtual void StopRevivingIfNecessary() = 0;
};
// The default revive policy. Once no servers are available, reviving is start.
// If in reviving state, the probability that a request is accepted is q/n, in
// which q is the number of current available server, n is the number of minimum
// working instances setting by user. If q is not changed during a given time,
// hold_time_ms, then the cluster is considered recovered and all the request
// would be sent to the current available servers.
class DefaultRevivePolicy : public RevivePolicy {
public:
DefaultRevivePolicy(int64_t minimum_working_instances, int64_t hold_time_ms);
void StartRevive() override;
bool RejectDuringReviving(const std::vector<ServerId>& server_list) override;
void StartReviving();
bool DoReject(const std::vector<ServerId>& server_list);
void StopRevivingIfNecessary();
private:
bool _reviving;
......
......@@ -884,6 +884,7 @@ public:
num_reject->fetch_add(1, butil::memory_order_relaxed);
}
}
delete this;
}
brpc::Controller cntl;
......@@ -898,24 +899,22 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "5000");
char* lb_algo[] = { "rr" , "random" };
const char* lb_algo[] = { "rr" , "random", "wrr", "c_murmurhash" };
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
options.timeout_ms = 300;
options.enable_circuit_breaker = true;
options.revive_policy = new brpc::DefaultRevivePolicy(2, 2000 /*2s*/);
// Set max_retry to 0 so that health check of servers
// Set max_retry to 0 so that the time of health check of different servers
// are not continuous.
options.max_retry = 0;
ASSERT_EQ(channel.Init("list://127.0.0.1:7777,127.0.0.1:7778",
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50,127.0.0.1:7778 50",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), 0);
uint64_t request_code = 0;
test::EchoRequest req;
req.set_message("123");
test::EchoResponse res;
......@@ -923,12 +922,14 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
// trigger one server to health check
{
brpc::Controller cntl;
cntl.set_request_code(brpc::policy::MurmurHash32(&++request_code, 8));
stub.Echo(&cntl, &req, &res, NULL);
}
bthread_usleep(500000);
// trigger the other server to health check
{
brpc::Controller cntl;
cntl.set_request_code(brpc::policy::MurmurHash32(&++request_code, 8));
stub.Echo(&cntl, &req, &res, NULL);
}
bthread_usleep(500000);
......@@ -947,14 +948,18 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
int64_t start_ms = butil::gettimeofday_ms();
butil::atomic<int32_t> num_reject(0);
int64_t q = 0;
while ((butil::gettimeofday_ms() - start_ms) <
brpc::FLAGS_health_check_interval * 1000 + 10) {
Done* done = new Done;
done->num_reject = &num_reject;
done->req.set_message("123");
done->cntl.set_request_code(brpc::policy::MurmurHash32(&++request_code, 8));
stub.Echo(&done->cntl, &done->req, &done->res, done);
q++;
bthread_usleep(1000);
}
ASSERT_TRUE(num_reject.load(butil::memory_order_relaxed) > 1700);
// should recover now
butil::atomic<int32_t> num_failed(0);
......@@ -962,12 +967,12 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
Done* done = new Done;
done->req.set_message("123");
done->num_failed = &num_failed;
done->cntl.set_request_code(brpc::policy::MurmurHash32(&++request_code, 8));
stub.Echo(&done->cntl, &done->req, &done->res, done);
bthread_usleep(1000);
}
bthread_usleep(1050*1000 /* sleep longer than timeout of service */);
ASSERT_EQ(0, num_failed.load(butil::memory_order_relaxed));
ASSERT_TRUE(num_reject.load(butil::memory_order_relaxed) > 1500);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment