Commit a1c05b58 authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: add KeyValuePairsSplitter & remove revive_policy in channelOptions

parent e3c341af
...@@ -52,7 +52,6 @@ ChannelOptions::ChannelOptions() ...@@ -52,7 +52,6 @@ ChannelOptions::ChannelOptions()
, auth(NULL) , auth(NULL)
, retry_policy(NULL) , retry_policy(NULL)
, ns_filter(NULL) , ns_filter(NULL)
, revive_policy(NULL)
{} {}
ChannelSSLOptions* ChannelOptions::mutable_ssl_options() { ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
...@@ -526,7 +525,6 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -526,7 +525,6 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
} else { } else {
cntl->_deadline_us = -1; cntl->_deadline_us = -1;
} }
cntl->_revive_policy = _options.revive_policy;
cntl->IssueRPC(start_send_real_us); cntl->IssueRPC(start_send_real_us);
if (done == NULL) { if (done == NULL) {
...@@ -564,7 +562,7 @@ int Channel::CheckHealth() { ...@@ -564,7 +562,7 @@ int Channel::CheckHealth() {
return -1; return -1;
} else { } else {
SocketUniquePtr tmp_sock; SocketUniquePtr tmp_sock;
LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL, NULL }; LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
LoadBalancer::SelectOut sel_out(&tmp_sock); LoadBalancer::SelectOut sel_out(&tmp_sock);
return _lb->SelectServer(sel_in, &sel_out); return _lb->SelectServer(sel_in, &sel_out);
} }
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include "brpc/details/profiler_linker.h" #include "brpc/details/profiler_linker.h"
#include "brpc/retry_policy.h" #include "brpc/retry_policy.h"
#include "brpc/naming_service_filter.h" #include "brpc/naming_service_filter.h"
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
...@@ -129,13 +128,6 @@ struct ChannelOptions { ...@@ -129,13 +128,6 @@ struct ChannelOptions {
// Default: "" // Default: ""
std::string connection_group; std::string connection_group;
// Customize the revive policy after all servers are shutdown. The
// interface is defined in src/brpc/revive_policy.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
RevivePolicy* revive_policy;
private: private:
// SSLOptions is large and not often used, allocate it on heap to // SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases. // prevent ChannelOptions from being bloated in most cases.
......
...@@ -252,7 +252,6 @@ void Controller::ResetPods() { ...@@ -252,7 +252,6 @@ void Controller::ResetPods() {
_request_stream = INVALID_STREAM_ID; _request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID; _response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL; _remote_stream_settings = NULL;
_revive_policy = NULL;
} }
Controller::Call::Call(Controller::Call* rhs) Controller::Call::Call(Controller::Call* rhs)
...@@ -997,7 +996,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -997,7 +996,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
} else { } else {
LoadBalancer::SelectIn sel_in = LoadBalancer::SelectIn sel_in =
{ start_realtime_us, true, { start_realtime_us, true,
has_request_code(), _request_code, _accessed, _revive_policy }; has_request_code(), _request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock); LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out); const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) { if (rc != 0) {
......
...@@ -41,7 +41,6 @@ ...@@ -41,7 +41,6 @@
#include "brpc/progressive_attachment.h" // ProgressiveAttachment #include "brpc/progressive_attachment.h" // ProgressiveAttachment
#include "brpc/progressive_reader.h" // ProgressiveReader #include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/grpc.h" #include "brpc/grpc.h"
#include "brpc/revive_policy.h"
// EAUTH is defined in MAC // EAUTH is defined in MAC
#ifndef EAUTH #ifndef EAUTH
...@@ -716,7 +715,6 @@ private: ...@@ -716,7 +715,6 @@ private:
uint64_t _request_code; uint64_t _request_code;
SocketId _single_server_id; SocketId _single_server_id;
butil::intrusive_ptr<SharedLoadBalancer> _lb; butil::intrusive_ptr<SharedLoadBalancer> _lb;
RevivePolicy* _revive_policy;
// for passing parameters to created bthread, don't modify it otherwhere. // for passing parameters to created bthread, don't modify it otherwhere.
CompletionInfo _tmp_completion_info; CompletionInfo _tmp_completion_info;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "brpc/shared_object.h" // SharedObject #include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId #include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T> #include "brpc/extension.h" // Extension<T>
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
...@@ -40,7 +39,6 @@ public: ...@@ -40,7 +39,6 @@ public:
bool has_request_code; bool has_request_code;
uint64_t request_code; uint64_t request_code;
const ExcludedServers* excluded; const ExcludedServers* excluded;
RevivePolicy* revive_policy;
}; };
struct SelectOut { struct SelectOut {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "butil/containers/flat_map.h" #include "butil/containers/flat_map.h"
#include "butil/errno.h" #include "butil/errno.h"
#include "butil/strings/string_number_conversions.h" #include "butil/strings/string_number_conversions.h"
#include "butil/strings/string_split.h"
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/policy/consistent_hashing_load_balancer.h" #include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h" #include "brpc/policy/hasher.h"
...@@ -302,19 +303,6 @@ int ConsistentHashingLoadBalancer::SelectServer( ...@@ -302,19 +303,6 @@ int ConsistentHashingLoadBalancer::SelectServer(
if (s->empty()) { if (s->empty()) {
return ENODATA; return ENODATA;
} }
RevivePolicy* rp = in.revive_policy;
if (rp && rp->StopRevivingIfNecessary()) {
std::set<ServerId> server_list;
for (auto server: *s) {
server_list.insert(server.server_sock);
}
std::vector<ServerId> server_list_distinct(
server_list.begin(), server_list.end());
if (rp->DoReject(server_list_distinct)) {
return EREJECT;
}
}
std::vector<Node>::const_iterator choice = std::vector<Node>::const_iterator choice =
std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code); std::lower_bound(s->begin(), s->end(), (uint32_t)in.request_code);
if (choice == s->end()) { if (choice == s->end()) {
...@@ -332,9 +320,6 @@ int ConsistentHashingLoadBalancer::SelectServer( ...@@ -332,9 +320,6 @@ int ConsistentHashingLoadBalancer::SelectServer(
} }
} }
} }
if (rp) {
rp->StartReviving();
}
return EHOSTDOWN; return EHOSTDOWN;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "butil/fast_rand.h" #include "butil/fast_rand.h"
#include "brpc/socket.h" #include "brpc/socket.h"
#include "brpc/policy/randomized_load_balancer.h" #include "brpc/policy/randomized_load_balancer.h"
#include "butil/strings/string_number_conversions.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -31,6 +31,10 @@ inline uint32_t GenRandomStride() { ...@@ -31,6 +31,10 @@ inline uint32_t GenRandomStride() {
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
} }
RandomizedLoadBalancer::RandomizedLoadBalancer()
: _revive_policy(NULL)
{}
bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
if (bg.server_list.capacity() < 128) { if (bg.server_list.capacity() < 128) {
bg.server_list.reserve(128); bg.server_list.reserve(128);
...@@ -110,9 +114,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -110,9 +114,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
RevivePolicy* rp = in.revive_policy; if (_revive_policy && _revive_policy->StopRevivingIfNecessary()) {
if (rp && rp->StopRevivingIfNecessary()) { if (_revive_policy->DoReject(s->server_list)) {
if (rp->DoReject(s->server_list)) {
return EREJECT; return EREJECT;
} }
} }
...@@ -134,8 +137,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -134,8 +137,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
// this failed server won't be visited again inside for // this failed server won't be visited again inside for
offset = (offset + stride) % n; offset = (offset + stride) % n;
} }
if (rp) { if (_revive_policy) {
rp->StartReviving(); _revive_policy->StartReviving();
} }
// After we traversed the whole server list, there is still no // After we traversed the whole server list, there is still no
// available server // available server
...@@ -143,8 +146,13 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -143,8 +146,13 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
} }
RandomizedLoadBalancer* RandomizedLoadBalancer::New( RandomizedLoadBalancer* RandomizedLoadBalancer::New(
const butil::StringPiece&) const { const butil::StringPiece& params) const {
return new (std::nothrow) RandomizedLoadBalancer; RandomizedLoadBalancer* lb = new (std::nothrow) RandomizedLoadBalancer;
if (lb && !lb->SetParameters(params)) {
delete lb;
lb = NULL;
}
return lb;
} }
void RandomizedLoadBalancer::Destroy() { void RandomizedLoadBalancer::Destroy() {
...@@ -170,5 +178,9 @@ void RandomizedLoadBalancer::Describe( ...@@ -170,5 +178,9 @@ void RandomizedLoadBalancer::Describe(
os << '}'; os << '}';
} }
bool RandomizedLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRevivePolicyByParams(params, &_revive_policy);
}
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -31,6 +31,7 @@ namespace policy { ...@@ -31,6 +31,7 @@ namespace policy {
// than RoundRobinLoadBalancer. // than RoundRobinLoadBalancer.
class RandomizedLoadBalancer : public LoadBalancer { class RandomizedLoadBalancer : public LoadBalancer {
public: public:
RandomizedLoadBalancer();
bool AddServer(const ServerId& id); bool AddServer(const ServerId& id);
bool RemoveServer(const ServerId& id); bool RemoveServer(const ServerId& id);
size_t AddServersInBatch(const std::vector<ServerId>& servers); size_t AddServersInBatch(const std::vector<ServerId>& servers);
...@@ -45,12 +46,14 @@ private: ...@@ -45,12 +46,14 @@ private:
std::vector<ServerId> server_list; std::vector<ServerId> server_list;
std::map<ServerId, size_t> server_map; std::map<ServerId, size_t> server_map;
}; };
bool SetParameters(const butil::StringPiece& params);
static bool Add(Servers& bg, const ServerId& id); static bool Add(Servers& bg, const ServerId& id);
static bool Remove(Servers& bg, const ServerId& id); static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers> _db_servers; butil::DoublyBufferedData<Servers> _db_servers;
std::shared_ptr<RevivePolicy> _revive_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -110,9 +110,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -110,9 +110,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if (n == 0) { if (n == 0) {
return ENODATA; return ENODATA;
} }
RevivePolicy* rp = in.revive_policy; if (_revive_policy && _revive_policy->StopRevivingIfNecessary()) {
if (rp && rp->StopRevivingIfNecessary()) { if (_revive_policy->DoReject(s->server_list)) {
if (rp->DoReject(s->server_list)) {
return EREJECT; return EREJECT;
} }
} }
...@@ -133,16 +132,21 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { ...@@ -133,16 +132,21 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
return 0; return 0;
} }
} }
if (rp) { if (_revive_policy) {
rp->StartReviving(); _revive_policy->StartReviving();
} }
s.tls() = tls; s.tls() = tls;
return EHOSTDOWN; return EHOSTDOWN;
} }
RoundRobinLoadBalancer* RoundRobinLoadBalancer::New( RoundRobinLoadBalancer* RoundRobinLoadBalancer::New(
const butil::StringPiece&) const { const butil::StringPiece& params) const {
return new (std::nothrow) RoundRobinLoadBalancer; RoundRobinLoadBalancer* lb = new (std::nothrow) RoundRobinLoadBalancer;
if (lb && !lb->SetParameters(params)) {
delete lb;
lb = NULL;
}
return lb;
} }
void RoundRobinLoadBalancer::Destroy() { void RoundRobinLoadBalancer::Destroy() {
...@@ -168,5 +172,10 @@ void RoundRobinLoadBalancer::Describe( ...@@ -168,5 +172,10 @@ void RoundRobinLoadBalancer::Describe(
os << '}'; os << '}';
} }
bool RoundRobinLoadBalancer::SetParameters(const butil::StringPiece& params) {
return GetRevivePolicyByParams(params, &_revive_policy);
}
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <map> // std::map #include <map> // std::map
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
#include "brpc/load_balancer.h" #include "brpc/load_balancer.h"
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -49,12 +49,14 @@ private: ...@@ -49,12 +49,14 @@ private:
uint32_t stride; uint32_t stride;
uint32_t offset; uint32_t offset;
}; };
bool SetParameters(const butil::StringPiece& params);
static bool Add(Servers& bg, const ServerId& id); static bool Add(Servers& bg, const ServerId& id);
static bool Remove(Servers& bg, const ServerId& id); static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
butil::DoublyBufferedData<Servers, TLS> _db_servers; butil::DoublyBufferedData<Servers, TLS> _db_servers;
std::shared_ptr<RevivePolicy> _revive_policy;
}; };
} // namespace policy } // namespace policy
......
...@@ -158,17 +158,6 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -158,17 +158,6 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
if (s->server_list.empty()) { if (s->server_list.empty()) {
return ENODATA; return ENODATA;
} }
RevivePolicy* rp = in.revive_policy;
if (rp && rp->StopRevivingIfNecessary()) {
std::vector<ServerId> server_list;
server_list.reserve(s->server_list.size());
for (auto server: s->server_list) {
server_list.emplace_back(server.id);
}
if (rp->DoReject(server_list)) {
return EREJECT;
}
}
TLS& tls = s.tls(); TLS& tls = s.tls();
if (tls.IsNeededCaculateNewStride(s->weight_sum, s->server_list.size())) { if (tls.IsNeededCaculateNewStride(s->weight_sum, s->server_list.size())) {
if (tls.stride == 0) { if (tls.stride == 0) {
...@@ -210,9 +199,6 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* ...@@ -210,9 +199,6 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
tls_temp.remain_server = tls.remain_server; tls_temp.remain_server = tls.remain_server;
} }
} }
if (rp) {
rp->StartReviving();
}
return EHOSTDOWN; return EHOSTDOWN;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "brpc/socket.h" #include "brpc/socket.h"
#include "butil/fast_rand.h" #include "butil/fast_rand.h"
#include "butil/time.h" #include "butil/time.h"
#include "butil/string_splitter.h"
namespace brpc { namespace brpc {
...@@ -104,4 +105,43 @@ bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) { ...@@ -104,4 +105,43 @@ bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) {
return false; return false;
} }
bool GetRevivePolicyByParams(const butil::StringPiece& params,
std::shared_ptr<RevivePolicy>* ptr_out) {
int64_t minimum_working_instances = -1;
int64_t hold_time_ms = -1;
bool has_meet_params = false;
for (butil::KeyValuePairsSplitter sp(params.begin(), params.end(), '=', ' ');
sp; ++sp) {
if (sp.value().empty()) {
LOG(ERROR) << "Empty value for " << sp.key() << " in lb parameter";
return false;
}
if (sp.key() == "minimum_working_instances") {
if (!butil::StringToInt64(sp.value(), &minimum_working_instances)) {
return false;
}
has_meet_params = true;
continue;
} else if (sp.key() == "hold_time_ms") {
if (!butil::StringToInt64(sp.value(), &hold_time_ms)) {
return false;
}
has_meet_params = true;
continue;
}
LOG(ERROR) << "Failed to set this unknown parameters " << sp.key_and_value();
}
if (minimum_working_instances > 0 && hold_time_ms > 0) {
ptr_out->reset(
new DefaultRevivePolicy(minimum_working_instances, hold_time_ms));
} else if (has_meet_params) {
// In this case, user set some params but not in the right way, just return
// false to let user take care of this situation.
LOG(ERROR) << "Invalid params=`" << params << "'";
return false;
}
return true;
}
} // namespace brpc } // namespace brpc
...@@ -18,7 +18,10 @@ ...@@ -18,7 +18,10 @@
#define BRPC_REVIVE_POLICY #define BRPC_REVIVE_POLICY
#include <cstdint> #include <cstdint>
#include <butil/synchronization/lock.h> #include <memory>
#include "butil/synchronization/lock.h"
#include "butil/strings/string_piece.h"
#include "butil/strings/string_number_conversions.h"
namespace brpc { namespace brpc {
...@@ -73,6 +76,11 @@ private: ...@@ -73,6 +76,11 @@ private:
int64_t _usable_cache_time_ms; int64_t _usable_cache_time_ms;
}; };
// Return a DefaultRevivePolicy object by params. The caller is responsible
// for memory management of the return value.
bool GetRevivePolicyByParams(const butil::StringPiece& params,
std::shared_ptr<RevivePolicy>* ptr_out);
} // namespace brpc } // namespace brpc
#endif #endif
......
...@@ -290,8 +290,7 @@ int Sender::IssueRPC(int64_t start_realtime_us) { ...@@ -290,8 +290,7 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
true, true,
_main_cntl->has_request_code(), _main_cntl->has_request_code(),
_main_cntl->_request_code, _main_cntl->_request_code,
_main_cntl->_accessed, _main_cntl->_accessed };
NULL };
ChannelBalancer::SelectOut sel_out; ChannelBalancer::SelectOut sel_out;
const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get()) const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get())
->SelectChannel(sel_in, &sel_out); ->SelectChannel(sel_in, &sel_out);
......
...@@ -27,7 +27,6 @@ ...@@ -27,7 +27,6 @@
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/controller.h" #include "brpc/controller.h"
#include "brpc/server.h" #include "brpc/server.h"
#include "brpc/revive_policy.h"
namespace brpc { namespace brpc {
DECLARE_int32(health_check_interval); DECLARE_int32(health_check_interval);
...@@ -214,7 +213,7 @@ void* select_server(void* arg) { ...@@ -214,7 +213,7 @@ void* select_server(void* arg) {
brpc::LoadBalancer* c = sa->lb; brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap; CountMap *selected_count = new CountMap;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
uint32_t rand_seed = rand(); uint32_t rand_seed = rand();
if (sa->hash) { if (sa->hash) {
...@@ -267,7 +266,7 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -267,7 +266,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
// Accessing empty lb should result in error. // Accessing empty lb should result in error.
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(ENODATA, lb->SelectServer(in, &out)); ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
...@@ -570,7 +569,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) { ...@@ -570,7 +569,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
const size_t SELECT_TIMES = 1000000; const size_t SELECT_TIMES = 1000000;
std::map<butil::EndPoint, size_t> times; std::map<butil::EndPoint, size_t> times;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
::brpc::LoadBalancer::SelectOut out(&ptr); ::brpc::LoadBalancer::SelectOut out(&ptr);
for (size_t i = 0; i < SELECT_TIMES; ++i) { for (size_t i = 0; i < SELECT_TIMES; ++i) {
in.has_request_code = true; in.has_request_code = true;
...@@ -647,7 +646,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) { ...@@ -647,7 +646,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// consistent with weight configured. // consistent with weight configured.
std::map<butil::EndPoint, size_t> select_result; std::map<butil::EndPoint, size_t> select_result;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
int total_weight = 12; int total_weight = 12;
std::vector<butil::EndPoint> select_servers; std::vector<butil::EndPoint> select_servers;
...@@ -705,7 +704,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) { ...@@ -705,7 +704,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
// The first socket is excluded. The second socket is logfoff. // The first socket is excluded. The second socket is logfoff.
// The third socket is invalid. // The third socket is invalid.
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, exclude };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out)); EXPECT_EQ(EHOSTDOWN, wrrlb.SelectServer(in, &out));
brpc::ExcludedServers::Destroy(exclude); brpc::ExcludedServers::Destroy(exclude);
...@@ -791,15 +790,13 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) { ...@@ -791,15 +790,13 @@ TEST_F(LoadBalancerTest, health_check_no_valid_server) {
TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
brpc::LoadBalancer* lb = NULL; brpc::LoadBalancer* lb = NULL;
int rand = butil::fast_rand_less_than(4); // TODO(zhujiashun)
int rand = butil::fast_rand_less_than(1);
if (rand == 0) { if (rand == 0) {
lb = new brpc::policy::RoundRobinLoadBalancer; brpc::policy::RandomizedLoadBalancer rlb;
lb = rlb.New("minimum_working_instances=2 hold_time_ms=2000");
} else if (rand == 1) { } else if (rand == 1) {
lb = new brpc::policy::RandomizedLoadBalancer; lb = new brpc::policy::RoundRobinLoadBalancer;
} else if (rand == 2) {
lb = new brpc::policy::WeightedRoundRobinLoadBalancer;
} else {
lb = new brpc::policy::ConsistentHashingLoadBalancer(brpc::policy::MurmurHash32);
} }
brpc::SocketUniquePtr ptr[2]; brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) { for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
...@@ -814,9 +811,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { ...@@ -814,9 +811,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
lb->AddServer(id); lb->AddServer(id);
} }
brpc::SocketUniquePtr sptr; brpc::SocketUniquePtr sptr;
int64_t hold_time_ms = 2000; brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL };
brpc::RevivePolicy* rp = new brpc::DefaultRevivePolicy(2, hold_time_ms);
brpc::LoadBalancer::SelectIn in = { 0, false, true, 0u, NULL, rp };
brpc::LoadBalancer::SelectOut out(&sptr); brpc::LoadBalancer::SelectOut out(&sptr);
ASSERT_EQ(0, lb->SelectServer(in, &out)); ASSERT_EQ(0, lb->SelectServer(in, &out));
...@@ -847,7 +842,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { ...@@ -847,7 +842,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
} }
} }
ASSERT_TRUE(abs(num_ereject - num_ok) < 30); ASSERT_TRUE(abs(num_ereject - num_ok) < 30);
bthread_usleep((hold_time_ms + 10) * 1000); bthread_usleep((2000 + 10) * 1000);
// After enough waiting time, traffic should be sent to all available servers. // After enough waiting time, traffic should be sent to all available servers.
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
...@@ -912,18 +907,17 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) { ...@@ -912,18 +907,17 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_intergrated) {
GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_short_window_error_percent", "30");
GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "5000"); GFLAGS_NS::SetCommandLineOption("circuit_breaker_max_isolation_duration_ms", "5000");
const char* lb_algo[] = { "rr" , "random", "wrr", "c_murmurhash" }; const char* lb_algo[] = { "random:minimum_working_instances=2 hold_time_ms=2000",
"rr:minimum_working_instances=2 hold_time_ms=2000" };
brpc::Channel channel; brpc::Channel channel;
brpc::ChannelOptions options; brpc::ChannelOptions options;
options.protocol = "http"; options.protocol = "http";
options.timeout_ms = 300; options.timeout_ms = 300;
options.enable_circuit_breaker = true; options.enable_circuit_breaker = true;
options.revive_policy = new brpc::DefaultRevivePolicy(2, 2000 /*2s*/);
// Set max_retry to 0 so that the time of health check of different servers // Set max_retry to 0 so that the time of health check of different servers
// are not continuous. // are not continuous.
options.max_retry = 0; options.max_retry = 0;
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50, 127.0.0.1:7778 50",
ASSERT_EQ(channel.Init("list://127.0.0.1:7777 50,127.0.0.1:7778 50",
lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))], lb_algo[butil::fast_rand_less_than(ARRAY_SIZE(lb_algo))],
&options), 0); &options), 0);
......
...@@ -475,54 +475,3 @@ TEST(URITest, query_remover_key_value_not_changed_after_modified_query) { ...@@ -475,54 +475,3 @@ TEST(URITest, query_remover_key_value_not_changed_after_modified_query) {
ASSERT_EQ(qr.value(), "value2"); ASSERT_EQ(qr.value(), "value2");
} }
TEST(URITest, query_splitter_sanity) {
std::string query = "key1=value1&key2=value2&key3=value3";
{
brpc::QuerySplitter qs(query);
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
{
brpc::QuerySplitter qs(query.data(), query.data() + query.size());
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
{
brpc::QuerySplitter qs(query.c_str());
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key1");
ASSERT_EQ(qs.value(), "value1");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key2");
ASSERT_EQ(qs.value(), "value2");
++qs;
ASSERT_TRUE(qs);
ASSERT_EQ(qs.key(), "key3");
ASSERT_EQ(qs.value(), "value3");
++qs;
ASSERT_FALSE(qs);
}
}
...@@ -386,4 +386,56 @@ TEST_F(StringSplitterTest, key_value_pairs_splitter_sanity) { ...@@ -386,4 +386,56 @@ TEST_F(StringSplitterTest, key_value_pairs_splitter_sanity) {
} }
} }
TEST_F(StringSplitterTest, key_value_pairs_splitter_sanity) {
std::string kvstr = "key1=value1&key2=value2&key3=value3";
{
butil::KeyValuePairsSplitter splitter(kvstr, '=', '&');
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key1");
ASSERT_EQ(splitter.value(), "value1");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key2");
ASSERT_EQ(splitter.value(), "value2");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key3");
ASSERT_EQ(splitter.value(), "value3");
++splitter;
ASSERT_FALSE(splitter);
}
{
butil::KeyValuePairsSplitter splitter(kvstr.data(), kvstr.data() + kvstr.size(), '=', '&');
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key1");
ASSERT_EQ(splitter.value(), "value1");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key2");
ASSERT_EQ(splitter.value(), "value2");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key3");
ASSERT_EQ(splitter.value(), "value3");
++splitter;
ASSERT_FALSE(splitter);
}
{
butil::KeyValuePairsSplitter splitter(kvstr.c_str(), '=', '&');
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key1");
ASSERT_EQ(splitter.value(), "value1");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key2");
ASSERT_EQ(splitter.value(), "value2");
++splitter;
ASSERT_TRUE(splitter);
ASSERT_EQ(splitter.key(), "key3");
ASSERT_EQ(splitter.value(), "value3");
++splitter;
ASSERT_FALSE(splitter);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment