Commit 6cb4475a authored by zhujiashun's avatar zhujiashun

revived_from_all_failed: optimize get availble server process

parent 890679ec
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
// Authors: Jiashun Zhu(zhujiashun@bilibili.com) // Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <vector> #include <vector>
#include <gflags/gflags.h>
#include "brpc/revive_policy.h" #include "brpc/revive_policy.h"
#include "butil/scoped_lock.h" #include "butil/scoped_lock.h"
#include "butil/synchronization/lock.h" #include "butil/synchronization/lock.h"
...@@ -25,14 +26,18 @@ ...@@ -25,14 +26,18 @@
namespace brpc { namespace brpc {
DEFINE_int64(detect_available_server_interval_ms, 10, "The interval "
"to detect available server count in DefaultRevivePolicy");
DefaultRevivePolicy::DefaultRevivePolicy( DefaultRevivePolicy::DefaultRevivePolicy(
int64_t minimum_working_instances, int64_t hold_time_ms) int64_t minimum_working_instances, int64_t hold_time_ms)
: _reviving(false) : _reviving(false)
, _minimum_working_instances(minimum_working_instances) , _minimum_working_instances(minimum_working_instances)
, _last_usable(0) , _last_usable(0)
, _last_usable_change_time_ms(0) , _last_usable_change_time_ms(0)
, _hold_time_ms(hold_time_ms) { } , _hold_time_ms(hold_time_ms)
, _usable_cache(0)
, _usable_cache_time_ms(0) { }
void DefaultRevivePolicy::StartReviving() { void DefaultRevivePolicy::StartReviving() {
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
...@@ -41,12 +46,11 @@ void DefaultRevivePolicy::StartReviving() { ...@@ -41,12 +46,11 @@ void DefaultRevivePolicy::StartReviving() {
bool DefaultRevivePolicy::StopRevivingIfNecessary() { bool DefaultRevivePolicy::StopRevivingIfNecessary() {
int64_t now_ms = butil::gettimeofday_ms(); int64_t now_ms = butil::gettimeofday_ms();
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (!_reviving) { if (!_reviving) {
mu.unlock();
return false; return false;
} }
{
std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable_change_time_ms != 0 && _last_usable != 0 && if (_last_usable_change_time_ms != 0 && _last_usable != 0 &&
(now_ms - _last_usable_change_time_ms > _hold_time_ms)) { (now_ms - _last_usable_change_time_ms > _hold_time_ms)) {
_reviving = false; _reviving = false;
...@@ -58,25 +62,33 @@ bool DefaultRevivePolicy::StopRevivingIfNecessary() { ...@@ -58,25 +62,33 @@ bool DefaultRevivePolicy::StopRevivingIfNecessary() {
return true; return true;
} }
bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) { int DefaultRevivePolicy::GetUsableServerCount(
{ int64_t now_ms, const std::vector<ServerId>& server_list) {
std::unique_lock<butil::Mutex> mu(_mutex); if (now_ms - _usable_cache_time_ms < FLAGS_detect_available_server_interval_ms) {
if (!_reviving) { return _usable_cache;
mu.unlock();
return false;
}
} }
size_t n = server_list.size();
int usable = 0; int usable = 0;
size_t n = server_list.size();
SocketUniquePtr ptr; SocketUniquePtr ptr;
// TODO(zhujiashun): optimize O(N)
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
if (Socket::Address(server_list[i].id, &ptr) == 0 if (Socket::Address(server_list[i].id, &ptr) == 0
&& !ptr->IsLogOff()) { && !ptr->IsLogOff()) {
usable++; usable++;
} }
} }
std::unique_lock<butil::Mutex> mu(_mutex);
_usable_cache = usable;
_usable_cache_time_ms = now_ms;
return _usable_cache;
}
bool DefaultRevivePolicy::DoReject(const std::vector<ServerId>& server_list) {
if (!_reviving) {
return false;
}
int64_t now_ms = butil::gettimeofday_ms(); int64_t now_ms = butil::gettimeofday_ms();
int usable = GetUsableServerCount(now_ms, server_list);
{ {
std::unique_lock<butil::Mutex> mu(_mutex); std::unique_lock<butil::Mutex> mu(_mutex);
if (_last_usable != usable) { if (_last_usable != usable) {
......
...@@ -59,6 +59,9 @@ public: ...@@ -59,6 +59,9 @@ public:
bool DoReject(const std::vector<ServerId>& server_list); bool DoReject(const std::vector<ServerId>& server_list);
bool StopRevivingIfNecessary(); bool StopRevivingIfNecessary();
private:
int GetUsableServerCount(int64_t now_ms, const std::vector<ServerId>& server_list);
private: private:
bool _reviving; bool _reviving;
int64_t _minimum_working_instances; int64_t _minimum_working_instances;
...@@ -66,6 +69,8 @@ private: ...@@ -66,6 +69,8 @@ private:
int64_t _last_usable; int64_t _last_usable;
int64_t _last_usable_change_time_ms; int64_t _last_usable_change_time_ms;
int64_t _hold_time_ms; int64_t _hold_time_ms;
int64_t _usable_cache;
int64_t _usable_cache_time_ms;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
namespace brpc { namespace brpc {
DECLARE_int32(health_check_interval); DECLARE_int32(health_check_interval);
DECLARE_int64(detect_available_server_interval_ms);
namespace policy { namespace policy {
extern uint32_t CRCHash32(const char *key, size_t len); extern uint32_t CRCHash32(const char *key, size_t len);
extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len)); extern const char* GetHashName(uint32_t (*hasher)(const void* key, size_t len));
...@@ -831,6 +832,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { ...@@ -831,6 +832,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr)); ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), &dummy_ptr));
dummy_ptr->Revive(); dummy_ptr->Revive();
} }
bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
// After one server is revived, the reject rate should be 50% // After one server is revived, the reject rate should be 50%
int num_ereject = 0; int num_ereject = 0;
int num_ok = 0; int num_ok = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment