Commit d9bab076 authored by gejun's avatar gejun

Make LALB sensitive to errors & remove useless flags in LALB

parent 4749ccd4
...@@ -425,23 +425,9 @@ int Channel::CheckHealth() { ...@@ -425,23 +425,9 @@ int Channel::CheckHealth() {
return -1; return -1;
} else { } else {
SocketUniquePtr tmp_sock; SocketUniquePtr tmp_sock;
LoadBalancer::SelectIn sel_in = { 0, false, 0, NULL }; LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
LoadBalancer::SelectOut sel_out(&tmp_sock); LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out); return _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) {
return rc;
}
if (sel_out.need_feedback) {
LoadBalancer::CallInfo info;
info.in.begin_time_us = 0;
info.in.has_request_code = false;
info.in.request_code = 0;
info.in.excluded = NULL;
info.server_id = tmp_sock->id();
info.error_code = ECANCELED;
_lb->Feedback(info);
}
return 0;
} }
} }
......
...@@ -648,6 +648,9 @@ void* Controller::RunEndRPC(void* arg) { ...@@ -648,6 +648,9 @@ void* Controller::RunEndRPC(void* arg) {
} }
inline bool does_error_affect_main_socket(int error_code) { inline bool does_error_affect_main_socket(int error_code) {
// Errors tested in this function are reported by pooled connections
// and very likely to indicate that the server-side is down and the socket
// should be health-checked.
return error_code == ECONNREFUSED || return error_code == ECONNREFUSED ||
error_code == ENETUNREACH || error_code == ENETUNREACH ||
error_code == EHOSTUNREACH || error_code == EHOSTUNREACH ||
...@@ -732,13 +735,8 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/, ...@@ -732,13 +735,8 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
sending_sock.reset(NULL); sending_sock.reset(NULL);
if (need_feedback) { if (need_feedback) {
LoadBalancer::CallInfo info; const LoadBalancer::CallInfo info =
info.in.begin_time_us = begin_time_us; { begin_time_us, peer_id, error_code, c };
info.in.has_request_code = c->has_request_code();
info.in.request_code = c->request_code();
info.in.excluded = NULL;
info.server_id = peer_id;
info.error_code = error_code;
c->_lb->Feedback(info); c->_lb->Feedback(info);
} }
} }
...@@ -947,7 +945,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -947,7 +945,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.peer_id = _single_server_id; _current_call.peer_id = _single_server_id;
} else { } else {
LoadBalancer::SelectIn sel_in = LoadBalancer::SelectIn sel_in =
{ start_realtime_us, has_request_code(), _request_code, _accessed }; { start_realtime_us, true,
has_request_code(), _request_code, _accessed };
LoadBalancer::SelectOut sel_out(&tmp_sock); LoadBalancer::SelectOut sel_out(&tmp_sock);
const int rc = _lb->SelectServer(sel_in, &sel_out); const int rc = _lb->SelectServer(sel_in, &sel_out);
if (rc != 0) { if (rc != 0) {
......
...@@ -28,11 +28,15 @@ ...@@ -28,11 +28,15 @@
namespace brpc { namespace brpc {
class Controller;
// Select a server from a set of servers (in form of ServerId). // Select a server from a set of servers (in form of ServerId).
class LoadBalancer : public NonConstDescribable, public Destroyable { class LoadBalancer : public NonConstDescribable, public Destroyable {
public: public:
struct SelectIn { struct SelectIn {
int64_t begin_time_us; int64_t begin_time_us;
// Weight of different nodes could be changed.
bool changable_weights;
bool has_request_code; bool has_request_code;
uint64_t request_code; uint64_t request_code;
const ExcludedServers* excluded; const ExcludedServers* excluded;
...@@ -46,9 +50,17 @@ public: ...@@ -46,9 +50,17 @@ public:
}; };
struct CallInfo { struct CallInfo {
LoadBalancer::SelectIn in; // Exactly same with SelectIn.begin_time_us, may be different from
// controller->_begin_time_us which is beginning of the RPC.
int64_t begin_time_us;
// Remote side of the call.
SocketId server_id; SocketId server_id;
// A RPC may have multiple calls, this error may be different from
// controller->ErrorCode();
int error_code; int error_code;
// The controller for the RPC. Should NOT be saved in Feedback()
// and used after the function.
const Controller* controller;
}; };
LoadBalancer() { } LoadBalancer() { }
......
...@@ -27,12 +27,12 @@ ...@@ -27,12 +27,12 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
DEFINE_bool(quadratic_latency, false, "Divide square of latency"); DEFINE_int64(min_weight, 1000, "Minimum weight of a node in LALB");
DEFINE_int64(min_weight, 1000, "minimum weight"); DEFINE_double(punish_inflight_ratio, 1.5, "Decrease weight proportionally if "
DEFINE_int64(dev_multiple, 0, "Multiple of deviation"); "average latency of the inflight requests exeeds average "
DEFINE_bool(count_inflight, true, "Adjust weight by inflight requests"); "latency of the node times this ratio");
DEFINE_double(punish_error_ratio, 1.2,
BRPC_VALIDATE_GFLAG(dev_multiple, NonNegativeInteger); "Multiply latencies caused by errors with this ratio");
static const int64_t DEFAULT_QPS = 1; static const int64_t DEFAULT_QPS = 1;
static const size_t INITIAL_WEIGHT_TREE_SIZE = 128; static const size_t INITIAL_WEIGHT_TREE_SIZE = 128;
...@@ -304,12 +304,12 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -304,12 +304,12 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
} }
} else if (Socket::Address(info.server_id, out->ptr) == 0 } else if (Socket::Address(info.server_id, out->ptr) == 0
&& !(*out->ptr)->IsLogOff()) { && !(*out->ptr)->IsLogOff()) {
if (!FLAGS_count_inflight) {
return 0;
}
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again. // choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) { || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
if (!in.changable_weights) {
return 0;
}
const Weight::AddInflightResult r = const Weight::AddInflightResult r =
info.weight->AddInflight(in, index, dice - left); info.weight->AddInflight(in, index, dice - left);
if (r.weight_diff) { if (r.weight_diff) {
...@@ -324,7 +324,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -324,7 +324,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
if (++ntry >= n) { if (++ntry >= n) {
break; break;
} }
} else if (FLAGS_count_inflight) { } else if (in.changable_weights) {
const int64_t diff = const int64_t diff =
info.weight->MarkFailed(index, total / n); info.weight->MarkFailed(index, total / n);
if (diff) { if (diff) {
...@@ -349,19 +349,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) ...@@ -349,19 +349,7 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
return EHOSTDOWN; return EHOSTDOWN;
} }
void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) { void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
// Make latency larger when error is caused by backup request or timedout.
// If we don't do this, there's not much difference between a timedout
// request and a successful request camed back just before timeout.
int64_t latency_percent = 0;
if (info.error_code == 0) {
latency_percent = 100;
} else if (info.error_code == EBACKUPREQUEST) {
latency_percent = 150;
} else if (info.error_code == ERPCTIMEDOUT) {
latency_percent = 200;
} // else we will not update any weight on other errors
butil::DoublyBufferedData<Servers>::ScopedPtr s; butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) { if (_db_servers.Read(&s) != 0) {
return; return;
...@@ -372,7 +360,7 @@ void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) { ...@@ -372,7 +360,7 @@ void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
} }
const size_t index = *pindex; const size_t index = *pindex;
Weight* w = s->weight_tree[index].weight; Weight* w = s->weight_tree[index].weight;
const int64_t diff = w->Update(info, index, latency_percent); const int64_t diff = w->Update(info, index);
if (diff != 0) { if (diff != 0) {
s->UpdateParentWeights(diff, index); s->UpdateParentWeights(diff, index);
_total.fetch_add(diff, butil::memory_order_relaxed); _total.fetch_add(diff, butil::memory_order_relaxed);
...@@ -380,8 +368,9 @@ void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) { ...@@ -380,8 +368,9 @@ void LocalityAwareLoadBalancer::Feedback(const CallInfo& info) {
} }
int64_t LocalityAwareLoadBalancer::Weight::Update( int64_t LocalityAwareLoadBalancer::Weight::Update(
const CallInfo& info, size_t index, int64_t latency_percent) { const CallInfo& ci, size_t index) {
const int64_t end_time_us = butil::gettimeofday_us(); const int64_t end_time_us = butil::gettimeofday_us();
const int64_t latency = end_time_us - ci.begin_time_us;
BAIDU_SCOPED_LOCK(_mutex); BAIDU_SCOPED_LOCK(_mutex);
if (Disabled()) { if (Disabled()) {
// The weight was disabled and will be removed soon, do nothing // The weight was disabled and will be removed soon, do nothing
...@@ -389,46 +378,85 @@ int64_t LocalityAwareLoadBalancer::Weight::Update( ...@@ -389,46 +378,85 @@ int64_t LocalityAwareLoadBalancer::Weight::Update(
return 0; return 0;
} }
_begin_time_sum -= info.in.begin_time_us; _begin_time_sum -= ci.begin_time_us;
--_begin_time_count; --_begin_time_count;
const int64_t latency =
(end_time_us - info.in.begin_time_us) * latency_percent / 100;
if (latency <= 0) { if (latency <= 0) {
// error happens or time skews, ignore the sample. // time skews, ignore the sample.
return 0; return 0;
} }
if (ci.error_code == 0) {
// Update latency and QPS // Add a new entry
TimeInfo tm_info = { latency, end_time_us, latency * (double)latency }; TimeInfo tm_info = { latency, end_time_us };
if (!_time_q.empty()) { if (!_time_q.empty()) {
tm_info.latency_sum += _time_q.bottom()->latency_sum; tm_info.latency_sum += _time_q.bottom()->latency_sum;
tm_info.squared_latency_sum += _time_q.bottom()->squared_latency_sum; }
_time_q.elim_push(tm_info);
} else {
// Accumulate into the last entry so that errors always decrease
// the overall QPS and latency.
// Note that the latency used is linearly mixed from the real latency
// (of an errorous call) and the timeout, so that errors that are more
// unlikely to be solved by later retries are punished more.
// Examples:
// max_retry=0: always use timeout
// max_retry=1, retried=0: latency
// max_retry=1, retried=1: timeout
// max_retry=2, retried=0: latency
// max_retry=2, retried=1: (latency + timeout) / 2
// max_retry=2, retried=2: timeout
// ...
int ndone = 1;
int nleft = 0;
if (ci.controller->max_retry() > 0) {
ndone = ci.controller->retried_count();
nleft = ci.controller->max_retry() - ndone;
}
const int64_t err_latency =
(nleft * (int64_t)(latency * FLAGS_punish_error_ratio)
+ ndone * ci.controller->timeout_ms() * 1000L) / (ndone + nleft);
if (!_time_q.empty()) {
TimeInfo* ti = _time_q.bottom();
ti->latency_sum += err_latency;
ti->end_time_us = end_time_us;
} else {
// If the first response is error, enlarge the latency as timedout
// since we know nothing about the normal latency yet.
const TimeInfo tm_info = {
std::max(err_latency, ci.controller->timeout_ms() * 1000L),
end_time_us
};
_time_q.push(tm_info);
}
} }
_time_q.elim_push(tm_info);
const int64_t top = _time_q.top()->end_time_us; const int64_t top_time_us = _time_q.top()->end_time_us;
const size_t n = _time_q.size(); const size_t n = _time_q.size();
int64_t scaled_qps = DEFAULT_QPS * WEIGHT_SCALE; int64_t scaled_qps = DEFAULT_QPS * WEIGHT_SCALE;
if (end_time_us > top) { if (end_time_us > top_time_us) {
// will not overflow. // Only calculate scaled_qps when the queue is full or the elapse
scaled_qps = (n - 1) * 1000000L * WEIGHT_SCALE / (end_time_us - top); // between bottom and top is reasonably large(so that error of the
if (scaled_qps < WEIGHT_SCALE) { // calculated QPS is probably smaller).
scaled_qps = WEIGHT_SCALE; if (n == _time_q.capacity() ||
end_time_us >= top_time_us + 1000000L/*1s*/) {
// will not overflow.
scaled_qps = (n - 1) * 1000000L * WEIGHT_SCALE / (end_time_us - top_time_us);
if (scaled_qps < WEIGHT_SCALE) {
scaled_qps = WEIGHT_SCALE;
}
} }
_avg_latency = (tm_info.latency_sum - _time_q.top()->latency_sum) / (n - 1); _avg_latency = (_time_q.bottom()->latency_sum -
double avg_squared_latency = (tm_info.squared_latency_sum - _time_q.top()->latency_sum) / (n - 1);
_time_q.top()->squared_latency_sum) / (n - 1); } else if (n == 1) {
_dev = (int64_t)sqrt(avg_squared_latency - _avg_latency * (double)_avg_latency); _avg_latency = _time_q.bottom()->latency_sum;
} else { } else {
_avg_latency = latency; // end_time_us <= top_time_us && n > 1: the QPS is so high that
_dev = _avg_latency; // the time elapse between top and bottom is 0(possible in examples),
} // or time skews, we don't update the weight for safety.
return 0;
if (FLAGS_quadratic_latency) {
_base_weight = scaled_qps / _avg_latency * 100000 / _avg_latency;
} else {
_base_weight = scaled_qps / _avg_latency;
} }
_base_weight = scaled_qps / _avg_latency;
return ResetWeight(index, end_time_us); return ResetWeight(index, end_time_us);
} }
...@@ -449,9 +477,8 @@ void LocalityAwareLoadBalancer::Weight::Describe(std::ostream& os, int64_t now) ...@@ -449,9 +477,8 @@ void LocalityAwareLoadBalancer::Weight::Describe(std::ostream& os, int64_t now)
size_t n = _time_q.size(); size_t n = _time_q.size();
double qps = 0; double qps = 0;
int64_t avg_latency = _avg_latency; int64_t avg_latency = _avg_latency;
int64_t dev = _dev;
if (n <= 1UL) { if (n <= 1UL) {
qps = DEFAULT_QPS; qps = 0;
} else { } else {
if (n == _time_q.capacity()) { if (n == _time_q.capacity()) {
--n; --n;
...@@ -471,7 +498,6 @@ void LocalityAwareLoadBalancer::Weight::Describe(std::ostream& os, int64_t now) ...@@ -471,7 +498,6 @@ void LocalityAwareLoadBalancer::Weight::Describe(std::ostream& os, int64_t now)
os << " inflight_delay=0"; os << " inflight_delay=0";
} }
os << " avg_latency=" << avg_latency os << " avg_latency=" << avg_latency
<< " dev=" << dev
<< " expected_qps=" << qps; << " expected_qps=" << qps;
} }
...@@ -492,7 +518,14 @@ void LocalityAwareLoadBalancer::Describe( ...@@ -492,7 +518,14 @@ void LocalityAwareLoadBalancer::Describe(
os << '['; os << '[';
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
const ServerInfo & info = s->weight_tree[i]; const ServerInfo & info = s->weight_tree[i];
os << "\n{id=" << info.server_id << " left=" os << "\n{id=" << info.server_id;
{
SocketUniquePtr tmp_ptr;
if (Socket::Address(info.server_id, &tmp_ptr) != 0) {
os << "(broken)";
}
}
os << " left="
<< info.left->load(butil::memory_order_relaxed) << ' '; << info.left->load(butil::memory_order_relaxed) << ' ';
info.weight->Describe(os, now); info.weight->Describe(os, now);
os << '}'; os << '}';
...@@ -511,7 +544,6 @@ LocalityAwareLoadBalancer::Weight::Weight(int64_t initial_weight) ...@@ -511,7 +544,6 @@ LocalityAwareLoadBalancer::Weight::Weight(int64_t initial_weight)
, _old_index((size_t)-1L) , _old_index((size_t)-1L)
, _old_weight(0) , _old_weight(0)
, _avg_latency(0) , _avg_latency(0)
, _dev(0)
, _time_q(_time_q_items, sizeof(_time_q_items), butil::NOT_OWN_STORAGE) { , _time_q(_time_q_items, sizeof(_time_q_items), butil::NOT_OWN_STORAGE) {
} }
......
...@@ -31,7 +31,7 @@ namespace brpc { ...@@ -31,7 +31,7 @@ namespace brpc {
namespace policy { namespace policy {
DECLARE_int64(min_weight); DECLARE_int64(min_weight);
DECLARE_int64(dev_multiple); DECLARE_double(punish_inflight_ratio);
// Locality-aware is an iterative algorithm to send requests to servers which // Locality-aware is an iterative algorithm to send requests to servers which
// have lowest expected latencies. Read docs/cn/lalb.md to get a peek at the // have lowest expected latencies. Read docs/cn/lalb.md to get a peek at the
...@@ -54,7 +54,6 @@ private: ...@@ -54,7 +54,6 @@ private:
struct TimeInfo { struct TimeInfo {
int64_t latency_sum; // microseconds int64_t latency_sum; // microseconds
int64_t end_time_us; int64_t end_time_us;
double squared_latency_sum; // for calculating deviation
}; };
class Servers; class Servers;
...@@ -68,7 +67,7 @@ private: ...@@ -68,7 +67,7 @@ private:
// Called in Feedback() to recalculate _weight. // Called in Feedback() to recalculate _weight.
// Returns diff of _weight. // Returns diff of _weight.
int64_t Update(const CallInfo&, size_t index, int64_t latency_percent); int64_t Update(const CallInfo&, size_t index);
// Weight of self. Notice that this value may change at any time. // Weight of self. Notice that this value may change at any time.
int64_t volatile_value() const { return _weight; } int64_t volatile_value() const { return _weight; }
...@@ -100,7 +99,6 @@ private: ...@@ -100,7 +99,6 @@ private:
size_t _old_index; size_t _old_index;
int64_t _old_weight; int64_t _old_weight;
int64_t _avg_latency; int64_t _avg_latency;
int64_t _dev;
butil::BoundedQueue<TimeInfo> _time_q; butil::BoundedQueue<TimeInfo> _time_q;
// content of _time_q // content of _time_q
TimeInfo _time_q_items[RECV_QUEUE_SIZE]; TimeInfo _time_q_items[RECV_QUEUE_SIZE];
...@@ -168,15 +166,10 @@ inline int64_t LocalityAwareLoadBalancer::Weight::ResetWeight( ...@@ -168,15 +166,10 @@ inline int64_t LocalityAwareLoadBalancer::Weight::ResetWeight(
if (_begin_time_count > 0) { if (_begin_time_count > 0) {
const int64_t inflight_delay = const int64_t inflight_delay =
now_us - _begin_time_sum / _begin_time_count; now_us - _begin_time_sum / _begin_time_count;
// note: we only punish latencies at least twice of average latency const int64_t punish_latency =
// when FLAGS_dev_multiple is 0. (int64_t)(_avg_latency * FLAGS_punish_inflight_ratio);
int64_t punish_latency = _avg_latency * 2;
const int64_t dev = FLAGS_dev_multiple * _dev;
if (dev > 0) {
punish_latency = _avg_latency + dev;
}
if (inflight_delay >= punish_latency && _avg_latency > 0) { if (inflight_delay >= punish_latency && _avg_latency > 0) {
new_weight = new_weight * _avg_latency / inflight_delay; new_weight = new_weight * punish_latency / inflight_delay;
} }
} }
if (new_weight < FLAGS_min_weight) { if (new_weight < FLAGS_min_weight) {
......
...@@ -284,6 +284,7 @@ Sender::Sender(Controller* cntl, ...@@ -284,6 +284,7 @@ Sender::Sender(Controller* cntl,
int Sender::IssueRPC(int64_t start_realtime_us) { int Sender::IssueRPC(int64_t start_realtime_us) {
_main_cntl->_current_call.need_feedback = false; _main_cntl->_current_call.need_feedback = false;
LoadBalancer::SelectIn sel_in = { start_realtime_us, LoadBalancer::SelectIn sel_in = { start_realtime_us,
true,
_main_cntl->has_request_code(), _main_cntl->has_request_code(),
_main_cntl->_request_code, _main_cntl->_request_code,
_main_cntl->_accessed }; _main_cntl->_accessed };
......
...@@ -74,6 +74,17 @@ DEFINE_int32(max_connection_pool_size, 100, ...@@ -74,6 +74,17 @@ DEFINE_int32(max_connection_pool_size, 100,
"maximum pooled connection count to a single endpoint"); "maximum pooled connection count to a single endpoint");
BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate); BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
DEFINE_int32(connect_timeout_as_unreachable, 3,
"If the socket failed to connect due to ETIMEDOUT for so many "
"times *continuously*, the error is changed to ENETUNREACH which "
"fails the main socket as well when this socket is pooled.");
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
return v >= 2 && v < 1000/*large enough*/;
}
BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
validate_connect_timeout_as_unreachable);
const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;
#ifdef BAIDU_INTERNAL #ifdef BAIDU_INTERNAL
...@@ -144,10 +155,13 @@ public: ...@@ -144,10 +155,13 @@ public:
// which has the disadvantage that accesses to different pools contend // which has the disadvantage that accesses to different pools contend
// with each other. // with each other.
butil::atomic<SocketPool*> socket_pool; butil::atomic<SocketPool*> socket_pool;
// The socket newing this object. // The socket newing this object.
SocketId creator_socket_id; SocketId creator_socket_id;
// Counting number of continuous ETIMEDOUT
butil::atomic<int> num_continuous_connect_timeouts;
// _in_size, _in_num_messages, _out_size, _out_num_messages of pooled // _in_size, _in_num_messages, _out_size, _out_num_messages of pooled
// sockets are counted into the corresponding fields in their _main_socket. // sockets are counted into the corresponding fields in their _main_socket.
butil::atomic<size_t> in_size; butil::atomic<size_t> in_size;
...@@ -168,6 +182,7 @@ public: ...@@ -168,6 +182,7 @@ public:
Socket::SharedPart::SharedPart(SocketId creator_socket_id2) Socket::SharedPart::SharedPart(SocketId creator_socket_id2)
: socket_pool(NULL) : socket_pool(NULL)
, creator_socket_id(creator_socket_id2) , creator_socket_id(creator_socket_id2)
, num_continuous_connect_timeouts(0)
, in_size(0) , in_size(0)
, in_num_messages(0) , in_num_messages(0)
, out_size(0) , out_size(0)
...@@ -1320,6 +1335,10 @@ void Socket::AfterAppConnected(int err, void* data) { ...@@ -1320,6 +1335,10 @@ void Socket::AfterAppConnected(int err, void* data) {
WriteRequest* req = static_cast<WriteRequest*>(data); WriteRequest* req = static_cast<WriteRequest*>(data);
if (err == 0) { if (err == 0) {
Socket* const s = req->socket; Socket* const s = req->socket;
SharedPart* sp = s->GetSharedPart();
if (sp) {
sp->num_continuous_connect_timeouts.store(0, butil::memory_order_relaxed);
}
// requests are not setup yet. check the comment on Setup() in Write() // requests are not setup yet. check the comment on Setup() in Write()
req->Setup(s); req->Setup(s);
bthread_t th; bthread_t th;
...@@ -1330,6 +1349,18 @@ void Socket::AfterAppConnected(int err, void* data) { ...@@ -1330,6 +1349,18 @@ void Socket::AfterAppConnected(int err, void* data) {
} }
} else { } else {
SocketUniquePtr s(req->socket); SocketUniquePtr s(req->socket);
if (err == ETIMEDOUT) {
SharedPart* sp = s->GetOrNewSharedPart();
if (sp->num_continuous_connect_timeouts.fetch_add(
1, butil::memory_order_relaxed) + 1 >=
FLAGS_connect_timeout_as_unreachable) {
// the race between store and fetch_add(in another thread) is
// OK since a critial error is about to return.
sp->num_continuous_connect_timeouts.store(
0, butil::memory_order_relaxed);
err = ENETUNREACH;
}
}
s->SetFailed(err, "Fail to connect %s: %s", s->SetFailed(err, "Fail to connect %s: %s",
s->description().c_str(), berror(err)); s->description().c_str(), berror(err));
s->ReleaseAllFailedWriteRequests(req); s->ReleaseAllFailedWriteRequests(req);
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
namespace brpc { namespace brpc {
namespace policy { namespace policy {
DECLARE_bool(count_inflight);
extern uint32_t CRCHash32(const char *key, size_t len); extern uint32_t CRCHash32(const char *key, size_t len);
}} }}
...@@ -201,7 +200,7 @@ void* select_server(void* arg) { ...@@ -201,7 +200,7 @@ void* select_server(void* arg) {
brpc::LoadBalancer* c = sa->lb; brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap; CountMap *selected_count = new CountMap;
brpc::LoadBalancer::SelectIn in = { 0, false, 0u, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
uint32_t rand_seed = rand(); uint32_t rand_seed = rand();
if (sa->hash) { if (sa->hash) {
...@@ -233,9 +232,6 @@ class SaveRecycle : public brpc::SocketUser { ...@@ -233,9 +232,6 @@ class SaveRecycle : public brpc::SocketUser {
}; };
TEST_F(LoadBalancerTest, update_while_selection) { TEST_F(LoadBalancerTest, update_while_selection) {
const bool saved = brpc::policy::FLAGS_count_inflight;
brpc::policy::FLAGS_count_inflight = false;
for (size_t round = 0; round < 4; ++round) { for (size_t round = 0; round < 4; ++round) {
brpc::LoadBalancer* lb = NULL; brpc::LoadBalancer* lb = NULL;
SelectArg sa = { NULL, NULL}; SelectArg sa = { NULL, NULL};
...@@ -256,7 +252,7 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -256,7 +252,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
// Accessing empty lb should result in error. // Accessing empty lb should result in error.
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, true, 0, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, true, 0, NULL };
brpc::LoadBalancer::SelectOut out(&ptr); brpc::LoadBalancer::SelectOut out(&ptr);
ASSERT_EQ(ENODATA, lb->SelectServer(in, &out)); ASSERT_EQ(ENODATA, lb->SelectServer(in, &out));
...@@ -344,12 +340,9 @@ TEST_F(LoadBalancerTest, update_while_selection) { ...@@ -344,12 +340,9 @@ TEST_F(LoadBalancerTest, update_while_selection) {
} }
delete lb; delete lb;
} }
brpc::policy::FLAGS_count_inflight = saved;
} }
TEST_F(LoadBalancerTest, fairness) { TEST_F(LoadBalancerTest, fairness) {
const bool saved = brpc::policy::FLAGS_count_inflight;
brpc::policy::FLAGS_count_inflight = false;
for (size_t round = 0; round < 4; ++round) { for (size_t round = 0; round < 4; ++round) {
brpc::LoadBalancer* lb = NULL; brpc::LoadBalancer* lb = NULL;
SelectArg sa = { NULL, NULL}; SelectArg sa = { NULL, NULL};
...@@ -447,7 +440,6 @@ TEST_F(LoadBalancerTest, fairness) { ...@@ -447,7 +440,6 @@ TEST_F(LoadBalancerTest, fairness) {
} }
delete lb; delete lb;
} }
brpc::policy::FLAGS_count_inflight = saved;
} }
TEST_F(LoadBalancerTest, consistent_hashing) { TEST_F(LoadBalancerTest, consistent_hashing) {
...@@ -492,7 +484,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) { ...@@ -492,7 +484,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
const size_t SELECT_TIMES = 1000000; const size_t SELECT_TIMES = 1000000;
std::map<butil::EndPoint, size_t> times; std::map<butil::EndPoint, size_t> times;
brpc::SocketUniquePtr ptr; brpc::SocketUniquePtr ptr;
brpc::LoadBalancer::SelectIn in = { 0, false, 0u, NULL }; brpc::LoadBalancer::SelectIn in = { 0, false, false, 0u, NULL };
::brpc::LoadBalancer::SelectOut out(&ptr); ::brpc::LoadBalancer::SelectOut out(&ptr);
for (size_t i = 0; i < SELECT_TIMES; ++i) { for (size_t i = 0; i < SELECT_TIMES; ++i) {
in.has_request_code = true; in.has_request_code = true;
......
...@@ -53,7 +53,7 @@ TEST_F(NamingServiceFilterTest, sanity) { ...@@ -53,7 +53,7 @@ TEST_F(NamingServiceFilterTest, sanity) {
ASSERT_EQ(0, butil::hostname2endpoint("10.128.0.1:1234", &ep)); ASSERT_EQ(0, butil::hostname2endpoint("10.128.0.1:1234", &ep));
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
brpc::SocketUniquePtr tmp_sock; brpc::SocketUniquePtr tmp_sock;
brpc::LoadBalancer::SelectIn sel_in = { 0, false, 0, NULL }; brpc::LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL };
brpc::LoadBalancer::SelectOut sel_out(&tmp_sock); brpc::LoadBalancer::SelectOut sel_out(&tmp_sock);
ASSERT_EQ(0, channel._lb->SelectServer(sel_in, &sel_out)); ASSERT_EQ(0, channel._lb->SelectServer(sel_in, &sel_out));
ASSERT_EQ(ep, tmp_sock->remote_side()); ASSERT_EQ(ep, tmp_sock->remote_side());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment