Commit 57b60aa4 authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: add RevivePolicy

parent acce7799
......@@ -52,6 +52,7 @@ ChannelOptions::ChannelOptions()
, auth(NULL)
, retry_policy(NULL)
, ns_filter(NULL)
, revive_policy(NULL)
{}
ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
......@@ -525,6 +526,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
} else {
cntl->_deadline_us = -1;
}
cntl->_revive_policy = _options.revive_policy;
cntl->IssueRPC(start_send_real_us);
if (done == NULL) {
......@@ -562,7 +564,7 @@ int Channel::CheckHealth() {
return -1;
} else {
SocketUniquePtr tmp_sock;
LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL, NULL };
LoadBalancer::SelectOut sel_out(&tmp_sock);
return _lb->SelectServer(sel_in, &sel_out);
}
......
......@@ -34,6 +34,7 @@
#include "brpc/details/profiler_linker.h"
#include "brpc/retry_policy.h"
#include "brpc/naming_service_filter.h"
#include "brpc/revive_policy.h"
namespace brpc {
......@@ -128,6 +129,10 @@ struct ChannelOptions {
// Default: ""
std::string connection_group;
// TODO(zhujiashun)
// Default: NULL
RevivePolicy* revive_policy;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases.
......
......@@ -18,6 +18,8 @@
#include <gflags/gflags.h>
#include <butil/time.h>
#include "brpc/circuit_breaker.h"
#include "brpc/errno.pb.h"
#include "butil/logging.h"
namespace brpc {
......
......@@ -252,6 +252,7 @@ void Controller::ResetPods() {
_request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL;
_revive_policy = NULL;
}
Controller::Call::Call(Controller::Call* rhs)
......@@ -996,7 +997,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
} else {
LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true,
has_request_code(), _request_code, _accessed };
has_request_code(), _request_code, _accessed, _revive_policy };
LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) {
......
......@@ -41,6 +41,7 @@
#include "brpc/progressive_attachment.h" // ProgressiveAttachment
#include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/grpc.h"
#include "brpc/revive_policy.h"
// EAUTH is defined in MAC
#ifndef EAUTH
......@@ -715,6 +716,7 @@ private:
uint64_t _request_code;
SocketId _single_server_id;
butil::intrusive_ptr<SharedLoadBalancer> _lb;
RevivePolicy* _revive_policy;
// for passing parameters to created bthread, don't modify it otherwhere.
CompletionInfo _tmp_completion_info;
......
......@@ -42,12 +42,14 @@ public:
struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
, log_succeed_without_server(true) {}
, log_succeed_without_server(true)
, minimum_working_instances(-1) {}
bool succeed_without_server;
bool log_succeed_without_server;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
int64_t minimum_working_instances;
};
// A dedicated thread to map a name to ServerIds
......
......@@ -23,6 +23,7 @@ enum Errno {
EUNUSED = 1015; // The socket was not needed
ESSL = 1016; // SSL related error
EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams
EREJECT = 1018; // The Request is rejected
// Errno caused by server
EINTERNAL = 2001; // Internal Server Error
......
......@@ -24,9 +24,7 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T>
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
#include "brpc/revive_policy.h"
namespace brpc {
......@@ -42,6 +40,7 @@ public:
bool has_request_code;
uint64_t request_code;
const ExcludedServers* excluded;
RevivePolicy* revive_policy;
};
struct SelectOut {
......
......@@ -110,7 +110,10 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) {
return ENODATA;
}
if (in.revive_policy &&
(in.revive_policy)->RejectDuringReviving(s->server_list)) {
return EREJECT;
}
uint32_t stride = 0;
size_t offset = butil::fast_rand_less_than(n);
for (size_t i = 0; i < n; ++i) {
......@@ -129,6 +132,9 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
// this failed server won't be visited again inside for
offset = (offset + stride) % n;
}
if (in.revive_policy) {
in.revive_policy->StartRevive();
}
// After we traversed the whole server list, there is still no
// available server
return EHOSTDOWN;
......
......@@ -31,6 +31,12 @@ namespace policy {
// than RoundRobinLoadBalancer.
class RandomizedLoadBalancer : public LoadBalancer {
public:
RandomizedLoadBalancer()
: _reviving(false)
//TODO(zhujiashun)
, _minimum_working_instances(2)
, _last_usable(0)
, _last_usable_change_time_ms(0) {}
bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers);
......@@ -51,6 +57,13 @@ private:
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers> _db_servers;
bool _reviving;
int64_t _minimum_working_instances;
// TODO(zhujiashun): remove mutex
butil::Mutex _mutex;
int64_t _last_usable;
int64_t _last_usable_change_time_ms;
};
} // namespace policy
......
......@@ -110,6 +110,10 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) {
return ENODATA;
}
if (in.revive_policy &&
(in.revive_policy)->RejectDuringReviving(s->server_list)) {
return EREJECT;
}
TLS tls = s.tls();
if (tls.stride == 0) {
tls.stride = GenRandomStride();
......@@ -127,6 +131,9 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0;
}
}
if (in.revive_policy) {
in.revive_policy->StartRevive();
}
s.tls() = tls;
return EHOSTDOWN;
}
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <vector>
#include "brpc/revive_policy.h"
#include "butil/scoped_lock.h"
#include "butil/synchronization/lock.h"
#include "brpc/server_id.h"
#include "brpc/socket.h"
#include "butil/fast_rand.h"
#include "butil/time.h"
namespace brpc {
DefaultRevivePolicy::DefaultRevivePolicy(
int64_t minimum_working_instances, int64_t hold_time_ms)
: _reviving(false)
, _minimum_working_instances(minimum_working_instances)
, _last_usable(0)
, _last_usable_change_time_ms(0)
, _hold_time_ms(hold_time_ms) { }
void DefaultRevivePolicy::StartRevive() {
_reviving = true;
}
bool DefaultRevivePolicy::RejectDuringReviving(
const std::vector<ServerId>& server_list) {
if (!_reviving) {
return false;
}
size_t n = server_list.size();
int usable = 0;
// TODO(zhujiashun): optimize looking process
SocketUniquePtr ptr;
for (size_t i = 0; i < n; ++i) {
if (Socket::Address(server_list[i].id, &ptr) == 0
&& !ptr->IsLogOff()) {
usable++;
}
}
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && usable != 0 &&
(butil::gettimeofday_ms() - _last_usable_change_time_ms > _hold_time_ms)
&& _last_usable == usable) {
_reviving = false;
_last_usable_change_time_ms = 0;
mu.unlock();
} else {
if (_last_usable != usable) {
_last_usable = usable;
_last_usable_change_time_ms = butil::gettimeofday_ms();
}
mu.unlock();
int rand = butil::fast_rand_less_than(_minimum_working_instances);
if (rand >= usable) {
return true;
}
}
return false;
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_REVIVE_POLICY
#define BRPC_REVIVE_POLICY
#include <cstdint>
#include <butil/synchronization/lock.h>
namespace brpc {
class ServerId;
class RevivePolicy {
public:
// TODO(zhujiashun):
virtual void StartRevive() = 0;
virtual bool RejectDuringReviving(const std::vector<ServerId>& server_list) = 0;
};
class DefaultRevivePolicy : public RevivePolicy {
public:
DefaultRevivePolicy(int64_t minimum_working_instances, int64_t hold_time_ms);
void StartRevive() override;
bool RejectDuringReviving(const std::vector<ServerId>& server_list) override;
private:
bool _reviving;
int64_t _minimum_working_instances;
butil::Mutex _mutex;
int64_t _last_usable;
int64_t _last_usable_change_time_ms;
int64_t _hold_time_ms;
};
} // namespace brpc
#endif
......@@ -290,7 +290,8 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
true,
_main_cntl->has_request_code(),
_main_cntl->_request_code,
_main_cntl->_accessed };
_main_cntl->_accessed,
NULL };
ChannelBalancer::SelectOut sel_out;
const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get())
->SelectChannel(sel_in, &sel_out);
......
......@@ -22,8 +22,15 @@
#include "brpc/policy/locality_aware_load_balancer.h"
#include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h"
#include "brpc/errno.pb.h"
#include "echo.pb.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
#include "brpc/revive_policy.h"
namespace brpc {
DECLARE_int32(health_check_interval);
namespace policy {
extern uint32_t CRCHash32(const char *key, size_t len);
extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
......@@ -206,7 +213,7 @@ void* select_server(void* arg) {
brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
uint32_t rand_seed = rand();
if (sa->hash) {
......@@ -259,7 +266,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
// Accessing empty lb should result in error.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
......@@ -562,7 +569,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
const size_t SELECT_TIMES = 1000000;
std::map<butil::EndPoint, size_t> times;
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL };
::brpc::LoadBalancer::SelectOut out(&ptr);
for (size_t i = 0; i < SELECT_TIMES; ++i) {
in.has_request_code = true;
......@@ -639,7 +646,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// consistent with weight configured.
std::map<butil::EndPoint, size_t> select_result;
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
int total_weight = 12;
std::vector<butil::EndPoint> select_servers;
......@@ -697,7 +704,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
// The first socket is excluded. The second socket is logfoff.
// The third socket is invalid.
brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude, NULL };
brpc::LoadBalancer::SelectOut out(&ptr);
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
brpc::ExcludedServers::Destroy(exclude);
......@@ -708,7 +715,6 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
"10.92.115.19:8832",
"10.42.122.201:8833",
};
std::vector<brpc::LoadBalancer*> lbs;
lbs.push_back(new brpc::policy::RoundRobinLoadBalancer);
lbs.push_back(new brpc::policy::RandomizedLoadBalancer);
......@@ -782,4 +788,186 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
}
}
TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
brpc::LoadBalancer* lb = new brpc::policy::RandomizedLoadBalancer;
brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
butil::EndPoint dummy;
ASSERT_EQ(0, str2endpoint(servers[i], &dummy));
brpc::SocketOptions options;
options.remote_side = dummy;
brpc::ServerId id(8888);
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
ASSERT_EQ(0, brpc::Socket::Address(id.id, &ptr[i]));
lb->AddServer(id);
}
brpc::SocketUniquePtr sptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL };
brpc::LoadBalancer::SelectOut out(&sptr);
ASSERT_EQ(0, lb->SelectServer(in, &out));
ptr[0]->SetFailed();
ptr[1]->SetFailed();
ASSERT_EQ(EHOSTDOWN, lb->SelectServer(in, &out));
// should reject all request since there is no available server
for (int i = 0; i < 10; ++i) {
ASSERT_EQ(brpc::EREJECT, lb->SelectServer(in, &out));
}
{
brpc::SocketUniquePtr dummy_ptr;
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr));
dummy_ptr->Revive();
}
// After one server is revived, the reject rate should be 50%
int num_ereject = 0;
int num_ok = 0;
for (int i = 0; i < 100; ++i) {
int rc = lb->SelectServer(in, &out);
if (rc == brpc::EREJECT) {
num_ereject++;
} else if (rc == 0) {
num_ok++;
} else {
ASSERT_TRUE(false);
}
}
ASSERT_TRUE(abs(num_ereject - num_ok) < 20);
// TODO(zhujiashun): longer than interval
int64_t sleep_time_ms = 2010;
bthread_usleep(sleep_time_ms * 1000);
// After enough waiting time, traffic should be sent to all available servers.
for (int i = 0; i < 10; ++i) {
ASSERT_EQ(0, lb->SelectServer(in, &out));
}
}
class EchoServiceImpl : public test::EchoService {
public:
EchoServiceImpl()
: _num_request(0) {}
virtual ~EchoServiceImpl() {}
virtual void Echo(google::protobuf::RpcController* cntl_base,
const test::EchoRequest* req,
test::EchoResponse* res,
google::protobuf::Closure* done) {
//brpc::Controller* cntl =
// static_cast<brpc::Controller*>(cntl_base);
brpc::ClosureGuard done_guard(done);
int p = _num_request.fetch_add(1, butil::memory_order_relaxed);
// max qps is 50
if (p < 70) {
bthread_usleep(100 * 1000);
_num_request.fetch_sub(1, butil::memory_order_relaxed);
res->set_message("OK");
} else {
_num_request.fetch_sub(1, butil::memory_order_relaxed);
bthread_usleep(1000 * 1000);
}
return;
}
butil::atomic<int> _num_request;
};
class Done : public google::protobuf::Closure {
public:
Done()
: num_failed(NULL)
, num_reject(NULL) {}
void Run() {
if (cntl.Failed()) {
if (num_failed) {
num_failed->fetch_add(1, butil::memory_order_relaxed);
}
if (cntl.ErrorCode() == brpc::EREJECT && num_reject) {
num_reject->fetch_add(1, butil::memory_order_relaxed);
}
}
}
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
butil::atomic<int32_t>* num_failed;
butil::atomic<int32_t>* num_reject;
};
TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_size", "20");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "5000");
char* lb_algo[] = { "rr" , "random" };
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
options.timeout_ms = 300;
options.enable_circuit_breaker = true;
options.revive_policy = new brpc::DefaultRevivePolicy(2, 2000 /*2s*/);
// Set max_retry to 0 so that health check of servers
// are not continuous.
options.max_retry = 0;
ASSERT_EQ(channel.Init("list://127.0.0.1:7777,127.0.0.1:7778",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), 0);
test::EchoRequest req;
req.set_message("123");
test::EchoResponse res;
test::EchoService_Stub stub(&channel);
// trigger one server to health check
{
brpc::Controller cntl;
stub.Echo(&cntl, &req, &res, NULL);
}
bthread_usleep(500000);
// trigger the other server to health check
{
brpc::Controller cntl;
stub.Echo(&cntl, &req, &res, NULL);
}
bthread_usleep(500000);
butil::EndPoint point(butil::IP_ANY, 7777);
brpc::Server server;
EchoServiceImpl service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(point, NULL));
butil::EndPoint point2(butil::IP_ANY, 7778);
brpc::Server server2;
EchoServiceImpl service2;
ASSERT_EQ(0, server2.AddService(&service2, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server2.Start(point2, NULL));
int64_t start_ms = butil::gettimeofday_ms();
butil::atomic<int32_t> num_reject(0);
while ((butil::gettimeofday_ms() - start_ms) <
brpc::FLAGS_health_check_interval * 1000 + 10) {
Done* done = new Done;
done->num_reject = &num_reject;
done->req.set_message("123");
stub.Echo(&done->cntl, &done->req, &done->res, done);
bthread_usleep(1000);
}
// should recover now
butil::atomic<int32_t> num_failed(0);
for (int i = 0; i < 1000; ++i) {
Done* done = new Done;
done->req.set_message("123");
done->num_failed = &num_failed;
stub.Echo(&done->cntl, &done->req, &done->res, done);
bthread_usleep(1000);
}
bthread_usleep(1050*1000 /* sleep longer than timeout of service */);
ASSERT_EQ(0, num_failed.load(butil::memory_order_relaxed));
ASSERT_TRUE(num_reject.load(butil::memory_order_relaxed) > 1500);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment