Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
084c1e7a
Commit
084c1e7a
authored
Mar 20, 2019
by
zhujiashun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
health_check_using_rpc: health check after revive & add UT
parent
2dc9cbad
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
186 additions
and
117 deletions
+186
-117
channel.cpp
src/brpc/channel.cpp
+1
-1
controller.cpp
src/brpc/controller.cpp
+9
-18
load_balancer.h
src/brpc/load_balancer.h
+1
-0
consistent_hashing_load_balancer.cpp
src/brpc/policy/consistent_hashing_load_balancer.cpp
+2
-1
dynpart_load_balancer.cpp
src/brpc/policy/dynpart_load_balancer.cpp
+2
-1
locality_aware_load_balancer.cpp
src/brpc/policy/locality_aware_load_balancer.cpp
+2
-1
randomized_load_balancer.cpp
src/brpc/policy/randomized_load_balancer.cpp
+2
-1
round_robin_load_balancer.cpp
src/brpc/policy/round_robin_load_balancer.cpp
+2
-1
weighted_round_robin_load_balancer.cpp
src/brpc/policy/weighted_round_robin_load_balancer.cpp
+2
-1
selective_channel.cpp
src/brpc/selective_channel.cpp
+2
-1
socket.cpp
src/brpc/socket.cpp
+17
-17
socket.h
src/brpc/socket.h
+2
-2
socket_inl.h
src/brpc/socket_inl.h
+4
-0
brpc_load_balancer_unittest.cpp
test/brpc_load_balancer_unittest.cpp
+98
-7
brpc_socket_unittest.cpp
test/brpc_socket_unittest.cpp
+40
-65
No files found.
src/brpc/channel.cpp
View file @
084c1e7a
...
...
@@ -571,7 +571,7 @@ int Channel::CheckHealth() {
return
-
1
;
}
else
{
SocketUniquePtr
tmp_sock
;
LoadBalancer
::
SelectIn
sel_in
=
{
0
,
false
,
false
,
0
,
NULL
};
LoadBalancer
::
SelectIn
sel_in
=
{
0
,
false
,
false
,
0
,
NULL
,
false
};
LoadBalancer
::
SelectOut
sel_out
(
&
tmp_sock
);
return
_lb
->
SelectServer
(
sel_in
,
&
sel_out
);
}
...
...
src/brpc/controller.cpp
View file @
084c1e7a
...
...
@@ -986,28 +986,19 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
if
(
SingleServer
())
{
// Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call.
if
(
!
health_check_call
)
{
const
int
rc
=
Socket
::
Address
(
_single_server_id
,
&
tmp_sock
);
if
(
rc
!=
0
||
tmp_sock
->
IsLogOff
())
{
SetFailed
(
EHOSTDOWN
,
"Not connected to %s yet, server_id=%"
PRIu64
,
endpoint2str
(
_remote_side
).
c_str
(),
_single_server_id
);
tmp_sock
.
reset
();
// Release ref ASAP
return
HandleSendFailed
();
}
}
else
{
const
int
rc
=
Socket
::
AddressFailedAsWell
(
_single_server_id
,
&
tmp_sock
);
if
(
rc
<
0
)
{
SetFailed
(
EFAILEDSOCKET
,
"Socket to %s has been recycled, server_id=%"
PRIu64
,
endpoint2str
(
_remote_side
).
c_str
(),
_single_server_id
);
tmp_sock
.
reset
();
// Release ref ASAP
return
HandleSendFailed
();
}
const
int
rc
=
Socket
::
Address
(
_single_server_id
,
&
tmp_sock
);
if
(
rc
!=
0
||
tmp_sock
->
IsLogOff
()
||
(
!
health_check_call
&&
tmp_sock
->
IsHealthCheckingUsingRPC
()))
{
SetFailed
(
EHOSTDOWN
,
"Not connected to %s yet, server_id=%"
PRIu64
,
endpoint2str
(
_remote_side
).
c_str
(),
_single_server_id
);
tmp_sock
.
reset
();
// Release ref ASAP
return
HandleSendFailed
();
}
_current_call
.
peer_id
=
_single_server_id
;
}
else
{
LoadBalancer
::
SelectIn
sel_in
=
{
start_realtime_us
,
true
,
has_request_code
(),
_request_code
,
_accessed
};
{
start_realtime_us
,
true
,
has_request_code
(),
_request_code
,
_accessed
,
health_check_call
};
LoadBalancer
::
SelectOut
sel_out
(
&
tmp_sock
);
const
int
rc
=
_lb
->
SelectServer
(
sel_in
,
&
sel_out
);
if
(
rc
!=
0
)
{
...
...
src/brpc/load_balancer.h
View file @
084c1e7a
...
...
@@ -40,6 +40,7 @@ public:
bool
has_request_code
;
uint64_t
request_code
;
const
ExcludedServers
*
excluded
;
bool
health_check_call
;
};
struct
SelectOut
{
...
...
src/brpc/policy/consistent_hashing_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -221,7 +221,8 @@ int ConsistentHashingLoadBalancer::SelectServer(
if
(((
i
+
1
)
==
s
->
size
()
// always take last chance
||
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
choice
->
server_sock
.
id
))
&&
Socket
::
Address
(
choice
->
server_sock
.
id
,
out
->
ptr
)
==
0
&&
!
(
*
out
->
ptr
)
->
IsLogOff
())
{
&&
!
(
*
out
->
ptr
)
->
IsLogOff
()
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
return
0
;
}
else
{
if
(
++
choice
==
s
->
end
())
{
...
...
src/brpc/policy/dynpart_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -122,7 +122,8 @@ int DynPartLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
for
(
size_t
i
=
0
;
i
<
n
;
++
i
)
{
const
SocketId
id
=
s
->
server_list
[
i
].
id
;
if
((
!
exclusion
||
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
id
))
&&
Socket
::
Address
(
id
,
&
ptrs
[
nptr
].
first
)
==
0
)
{
&&
Socket
::
Address
(
id
,
&
ptrs
[
nptr
].
first
)
==
0
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
int
w
=
schan
::
GetSubChannelWeight
(
ptrs
[
nptr
].
first
->
user
());
total_weight
+=
w
;
if
(
nptr
<
8
)
{
...
...
src/brpc/policy/locality_aware_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -303,7 +303,8 @@ int LocalityAwareLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out)
continue
;
}
}
else
if
(
Socket
::
Address
(
info
.
server_id
,
out
->
ptr
)
==
0
&&
!
(
*
out
->
ptr
)
->
IsLogOff
())
{
&&
!
(
*
out
->
ptr
)
->
IsLogOff
()
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
if
((
ntry
+
1
)
==
n
// Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
||
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
info
.
server_id
))
{
...
...
src/brpc/policy/randomized_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -118,7 +118,8 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if
(((
i
+
1
)
==
n
// always take last chance
||
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
id
))
&&
Socket
::
Address
(
id
,
out
->
ptr
)
==
0
&&
!
(
*
out
->
ptr
)
->
IsLogOff
())
{
&&
!
(
*
out
->
ptr
)
->
IsLogOff
()
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
// We found an available server
return
0
;
}
...
...
src/brpc/policy/round_robin_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -122,7 +122,8 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
if
(((
i
+
1
)
==
n
// always take last chance
||
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
id
))
&&
Socket
::
Address
(
id
,
out
->
ptr
)
==
0
&&
!
(
*
out
->
ptr
)
->
IsLogOff
())
{
&&
!
(
*
out
->
ptr
)
->
IsLogOff
()
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
s
.
tls
()
=
tls
;
return
0
;
}
...
...
src/brpc/policy/weighted_round_robin_load_balancer.cpp
View file @
084c1e7a
...
...
@@ -180,7 +180,8 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
SocketId
server_id
=
GetServerInNextStride
(
s
->
server_list
,
filter
,
tls_temp
);
if
(
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
server_id
)
&&
Socket
::
Address
(
server_id
,
out
->
ptr
)
==
0
&&
!
(
*
out
->
ptr
)
->
IsLogOff
())
{
&&
!
(
*
out
->
ptr
)
->
IsLogOff
()
&&
(
in
.
health_check_call
||
!
(
*
out
->
ptr
)
->
IsHealthCheckingUsingRPC
()))
{
// update tls.
tls
.
remain_server
=
tls_temp
.
remain_server
;
tls
.
position
=
tls_temp
.
position
;
...
...
src/brpc/selective_channel.cpp
View file @
084c1e7a
...
...
@@ -290,7 +290,8 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
true
,
_main_cntl
->
has_request_code
(),
_main_cntl
->
_request_code
,
_main_cntl
->
_accessed
};
_main_cntl
->
_accessed
,
false
};
ChannelBalancer
::
SelectOut
sel_out
;
const
int
rc
=
static_cast
<
ChannelBalancer
*>
(
_main_cntl
->
_lb
.
get
())
->
SelectChannel
(
sel_in
,
&
sel_out
);
...
...
src/brpc/socket.cpp
View file @
084c1e7a
...
...
@@ -479,7 +479,7 @@ Socket::Socket(Forbidden)
,
_epollout_butex
(
NULL
)
,
_write_head
(
NULL
)
,
_stream_set
(
NULL
)
//
, _health_checking_using_rpc(false)
,
_health_checking_using_rpc
(
false
)
{
CreateVarsOnce
();
pthread_mutex_init
(
&
_id_wait_list_mutex
,
NULL
);
...
...
@@ -662,7 +662,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m
->
_error_code
=
0
;
m
->
_error_text
.
clear
();
m
->
_agent_socket_id
.
store
(
INVALID_SOCKET_ID
,
butil
::
memory_order_relaxed
);
//
m->_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
m
->
_health_checking_using_rpc
.
store
(
false
,
butil
::
memory_order_relaxed
);
// NOTE: last two params are useless in bthread > r32787
const
int
rc
=
bthread_id_list_init
(
&
m
->
_id_wait_list
,
512
,
512
);
if
(
rc
)
{
...
...
@@ -754,6 +754,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
_pipeline_q
->
clear
();
}
}
_health_checking_using_rpc
.
store
(
false
,
butil
::
memory_order_relaxed
);
return
0
;
}
...
...
@@ -783,12 +784,14 @@ void Socket::Revive() {
}
// Set this flag to true since we add additional ref again
_recycle_flag
.
store
(
false
,
butil
::
memory_order_relaxed
);
//_health_checking_using_rpc.store(false, butil::memory_order_relaxed);
if
(
_user
)
{
_user
->
AfterRevived
(
this
);
}
else
{
LOG
(
INFO
)
<<
"Revived "
<<
*
this
;
}
if
(
FLAGS_health_check_using_rpc
)
{
_health_checking_using_rpc
.
store
(
true
,
butil
::
memory_order_relaxed
);
}
return
;
}
}
...
...
@@ -874,7 +877,6 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if
(
_health_check_interval_s
>
0
)
{
//!_health_checking_using_rpc.load(butil::memory_order_relaxed)) {
GetOrNewSharedPart
()
->
circuit_breaker
.
MarkAsBroken
();
PeriodicTaskManager
::
StartTaskAt
(
new
HealthCheckTask
(
id
()),
...
...
@@ -1034,34 +1036,31 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
if
(
ptr
->
CreatedByConnect
())
{
s_vars
->
channel_conn
<<
-
1
;
}
if
(
FLAGS_health_check_using_rpc
)
{
//ptr->_health_checking_using_rpc.store(true, butil::memory_order_relaxed);
ptr
->
Revive
();
ptr
->
_hc_count
=
0
;
if
(
ptr
->
IsHealthCheckingUsingRPC
())
{
brpc
::
ChannelOptions
options
;
options
.
protocol
=
"http"
;
options
.
max_retry
=
0
;
options
.
timeout_ms
=
FLAGS_health_check_timeout_ms
;
brpc
::
Channel
channel
;
if
(
channel
.
Init
(
_id
,
&
options
)
!=
0
)
{
++
ptr
->
_hc_count
;
*
next_abstime
=
butil
::
seconds_from_now
(
ptr
->
_health_check_interval_s
);
return
tru
e
;
// SetFailed() again to trigger next round of health checking
ptr
->
SetFailed
(
);
return
fals
e
;
}
brpc
::
Controller
cntl
;
cntl
.
http_request
().
uri
()
=
FLAGS_health_check_path
;
cntl
.
set_health_check_call
(
true
);
channel
.
CallMethod
(
NULL
,
&
cntl
,
NULL
,
NULL
,
NULL
);
if
(
cntl
.
Failed
())
{
LOG
(
WARNING
)
<<
"Fail to health check using rpc, error="
RPC_VLOG
<<
"Fail to health check using rpc, error="
<<
cntl
.
ErrorText
();
++
ptr
->
_hc_count
;
*
next_abstime
=
butil
::
seconds_from_now
(
ptr
->
_health_check_interval_s
);
return
true
;
ptr
->
SetFailed
();
return
false
;
}
LOG
(
INFO
)
<<
"Succeed to health check using rpc"
;
ptr
->
ResetHealthCheckingUsingRPC
()
;
}
ptr
->
Revive
();
ptr
->
_hc_count
=
0
;
return
false
;
}
else
if
(
hc
==
ESTOP
)
{
LOG
(
INFO
)
<<
"Cancel checking "
<<
*
ptr
;
...
...
@@ -2241,6 +2240,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<<
"
\n
auth_id="
<<
ptr
->
_auth_id
.
value
<<
"
\n
auth_context="
<<
ptr
->
_auth_context
<<
"
\n
logoff_flag="
<<
ptr
->
_logoff_flag
.
load
(
butil
::
memory_order_relaxed
)
// TODO(zhujiashun): add _health_checking_using_rpc
<<
"
\n
recycle_flag="
<<
ptr
->
_recycle_flag
.
load
(
butil
::
memory_order_relaxed
)
<<
"
\n
agent_socket_id="
;
const
SocketId
asid
=
ptr
->
_agent_socket_id
.
load
(
butil
::
memory_order_relaxed
);
...
...
src/brpc/socket.h
View file @
084c1e7a
...
...
@@ -351,7 +351,8 @@ public:
// TODO(zhujiashun)
bool
IsHealthCheckingUsingRPC
()
const
;
void
ResetHealthCheckingUsingRPC
();
// Start to process edge-triggered events from the fd.
// This function does not block caller.
static
int
StartInputEvent
(
SocketId
id
,
uint32_t
events
,
...
...
@@ -797,7 +798,6 @@ private:
// If this flag is set, then the current socket is used to health check
// and should not health check again
butil
::
atomic
<
bool
>
_health_checking_using_rpc
;
};
}
// namespace brpc
...
...
src/brpc/socket_inl.h
View file @
084c1e7a
...
...
@@ -249,6 +249,10 @@ inline bool Socket::IsHealthCheckingUsingRPC() const {
return
_health_checking_using_rpc
.
load
(
butil
::
memory_order_relaxed
);
}
inline
void
Socket
::
ResetHealthCheckingUsingRPC
()
{
_health_checking_using_rpc
.
store
(
false
,
butil
::
memory_order_relaxed
);
}
static
const
uint32_t
EOF_FLAG
=
(
1
<<
31
);
inline
void
Socket
::
PostponeEOF
()
{
...
...
test/brpc_load_balancer_unittest.cpp
View file @
084c1e7a
...
...
@@ -205,7 +205,7 @@ void* select_server(void* arg) {
brpc
::
LoadBalancer
*
c
=
sa
->
lb
;
brpc
::
SocketUniquePtr
ptr
;
CountMap
*
selected_count
=
new
CountMap
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
};
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
uint32_t
rand_seed
=
rand
();
if
(
sa
->
hash
)
{
...
...
@@ -259,7 +259,7 @@ TEST_F(LoadBalancerTest, update_while_selection) {
// Accessing empty lb should result in error.
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
true
,
0
,
NULL
};
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
true
,
0
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
ASSERT_EQ
(
ENODATA
,
lb
->
SelectServer
(
in
,
&
out
));
...
...
@@ -555,7 +555,7 @@ TEST_F(LoadBalancerTest, consistent_hashing) {
const
size_t
SELECT_TIMES
=
1000000
;
std
::
map
<
butil
::
EndPoint
,
size_t
>
times
;
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
};
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
::
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
for
(
size_t
i
=
0
;
i
<
SELECT_TIMES
;
++
i
)
{
in
.
has_request_code
=
true
;
...
...
@@ -632,7 +632,7 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
// consistent with weight configured.
std
::
map
<
butil
::
EndPoint
,
size_t
>
select_result
;
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
};
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
int
total_weight
=
12
;
std
::
vector
<
butil
::
EndPoint
>
select_servers
;
...
...
@@ -647,11 +647,11 @@ TEST_F(LoadBalancerTest, weighted_round_robin) {
}
std
::
cout
<<
std
::
endl
;
// Check whether slected result is consistent with expected.
EXPECT_EQ
(
3
,
select_result
.
size
());
EXPECT_EQ
(
(
size_t
)
3
,
select_result
.
size
());
for
(
const
auto
&
result
:
select_result
)
{
std
::
cout
<<
result
.
first
<<
" result="
<<
result
.
second
<<
" configured="
<<
configed_weight
[
result
.
first
]
<<
std
::
endl
;
EXPECT_EQ
(
result
.
second
,
configed_weight
[
result
.
first
]);
EXPECT_EQ
(
result
.
second
,
(
size_t
)
configed_weight
[
result
.
first
]);
}
}
...
...
@@ -690,10 +690,101 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) {
// The first socket is excluded. The second socket is logfoff.
// The third socket is invalid.
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
exclude
};
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
exclude
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
EXPECT_EQ
(
EHOSTDOWN
,
wrrlb
.
SelectServer
(
in
,
&
out
));
brpc
::
ExcludedServers
::
Destroy
(
exclude
);
}
TEST_F
(
LoadBalancerTest
,
health_checking_no_valid_server
)
{
// If socket is revived and FLAGS_health_check_using_rpc is set,
// this socket should not be selected.
const
char
*
servers
[]
=
{
"10.92.115.19:8832"
,
"10.42.122.201:8833"
,
};
std
::
vector
<
brpc
::
LoadBalancer
*>
lbs
;
lbs
.
push_back
(
new
brpc
::
policy
::
RoundRobinLoadBalancer
);
lbs
.
push_back
(
new
brpc
::
policy
::
RandomizedLoadBalancer
);
lbs
.
push_back
(
new
brpc
::
policy
::
WeightedRoundRobinLoadBalancer
);
for
(
int
i
=
0
;
i
<
(
int
)
lbs
.
size
();
++
i
)
{
brpc
::
LoadBalancer
*
lb
=
lbs
[
i
];
std
::
vector
<
brpc
::
ServerId
>
ids
;
for
(
size_t
i
=
0
;
i
<
ARRAY_SIZE
(
servers
);
++
i
)
{
butil
::
EndPoint
dummy
;
ASSERT_EQ
(
0
,
str2endpoint
(
servers
[
i
],
&
dummy
));
brpc
::
ServerId
id
(
8888
);
brpc
::
SocketOptions
options
;
options
.
remote_side
=
dummy
;
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Create
(
options
,
&
id
.
id
));
id
.
tag
=
"50"
;
ids
.
push_back
(
id
);
lb
->
AddServer
(
id
);
}
// Without setting anything, the lb should work fine
for
(
int
i
=
0
;
i
<
4
;
++
i
)
{
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
ASSERT_EQ
(
0
,
lb
->
SelectServer
(
in
,
&
out
));
}
brpc
::
SocketUniquePtr
ptr
;
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Address
(
ids
[
0
].
id
,
&
ptr
));
ptr
->
_health_checking_using_rpc
.
store
(
true
,
butil
::
memory_order_relaxed
);
for
(
int
i
=
0
;
i
<
4
;
++
i
)
{
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
ASSERT_EQ
(
0
,
lb
->
SelectServer
(
in
,
&
out
));
// After putting server[0] into health checking state, the only choice is servers[1]
ASSERT_EQ
(
ptr
->
remote_side
().
port
,
8833
);
}
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Address
(
ids
[
1
].
id
,
&
ptr
));
ptr
->
_health_checking_using_rpc
.
store
(
true
,
butil
::
memory_order_relaxed
);
for
(
int
i
=
0
;
i
<
4
;
++
i
)
{
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
// There is no server available
ASSERT_EQ
(
EHOSTDOWN
,
lb
->
SelectServer
(
in
,
&
out
));
}
// set health_check_call to true, the lb should work fine
bool
get_server1
=
false
;
bool
get_server2
=
false
;
// The probability of 20 consecutive same server is 1 / (2^19)
for
(
int
i
=
0
;
i
<
20
;
++
i
)
{
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
true
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
ASSERT_EQ
(
0
,
lb
->
SelectServer
(
in
,
&
out
));
if
(
ptr
->
remote_side
().
port
==
8832
)
{
get_server1
=
true
;
}
else
{
get_server2
=
true
;
}
}
ASSERT_TRUE
(
get_server1
&&
get_server2
);
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Address
(
ids
[
0
].
id
,
&
ptr
));
ptr
->
ResetHealthCheckingUsingRPC
();
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Address
(
ids
[
1
].
id
,
&
ptr
));
ptr
->
ResetHealthCheckingUsingRPC
();
// After reset health checking state, the lb should work fine
for
(
int
i
=
0
;
i
<
4
;
++
i
)
{
brpc
::
SocketUniquePtr
ptr
;
brpc
::
LoadBalancer
::
SelectIn
in
=
{
0
,
false
,
false
,
0u
,
NULL
,
false
};
brpc
::
LoadBalancer
::
SelectOut
out
(
&
ptr
);
ASSERT_EQ
(
0
,
lb
->
SelectServer
(
in
,
&
out
));
}
delete
lb
;
}
}
}
//namespace
test/brpc_socket_unittest.cpp
View file @
084c1e7a
...
...
@@ -7,6 +7,7 @@
#include <sys/socket.h>
#include <fcntl.h> // F_GETFD
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include "butil/gperftools_profiler.h"
#include "butil/time.h"
#include "butil/macros.h"
...
...
@@ -21,6 +22,8 @@
#include "brpc/policy/http_rpc_protocol.h"
#include "brpc/nshead.h"
#include "brpc/server.h"
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "health_check.pb.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
...
...
@@ -32,6 +35,10 @@ namespace bthread {
extern
TaskControl
*
g_task_control
;
}
namespace
brpc
{
DECLARE_int32
(
health_check_interval
);
}
void
EchoProcessHuluRequest
(
brpc
::
InputMessageBase
*
msg_base
);
int
main
(
int
argc
,
char
*
argv
[])
{
...
...
@@ -537,13 +544,9 @@ public:
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
brpc
::
Controller
*
cntl
=
(
brpc
::
Controller
*
)
cntl_base
;
LOG
(
INFO
)
<<
"In HealthCheckTestServiceImpl, flag="
<<
_sleep_flag
;
if
(
_sleep_flag
)
{
bthread_usleep
(
310000
/* 310ms, a little bit longer than the default
timeout of health checking rpc */
);
}
else
{
LOG
(
INFO
)
<<
"Return fast!"
;
}
cntl
->
response_attachment
().
append
(
"OK"
);
}
...
...
@@ -554,80 +557,52 @@ public:
TEST_F
(
SocketTest
,
health_check_using_rpc
)
{
GFLAGS_NS
::
SetCommandLineOption
(
"health_check_using_rpc"
,
"true"
);
GFLAGS_NS
::
SetCommandLineOption
(
"health_check_path"
,
"/HealthCheckTestService"
);
brpc
::
SocketId
id
=
8888
;
butil
::
EndPoint
point
(
butil
::
IP_ANY
,
7777
);
const
int
kCheckInteval
=
1
;
brpc
::
SocketOptions
options
;
options
.
remote_side
=
point
;
options
.
user
=
new
CheckRecycle
;
options
.
health_check_interval_s
=
kCheckInteval
/*s*/
;
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Create
(
options
,
&
id
));
brpc
::
SocketUniquePtr
s
;
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Address
(
id
,
&
s
));
global_sock
=
s
.
get
();
ASSERT_TRUE
(
global_sock
);
int
old_health_check_interval
=
brpc
::
FLAGS_health_check_interval
;
const
char
*
buf
=
"GET / HTTP/1.1
\r\n
Host: brpc.com
\r\n\r\n
"
;
const
bool
use_my_message
=
(
butil
::
fast_rand_less_than
(
2
)
==
0
);
brpc
::
SocketMessagePtr
<
MyMessage
>
msg
;
int
appended_msg
=
0
;
butil
::
IOBuf
src
;
if
(
use_my_message
)
{
LOG
(
INFO
)
<<
"Use MyMessage"
;
msg
.
reset
(
new
MyMessage
(
buf
,
strlen
(
buf
),
&
appended_msg
));
}
else
{
src
.
append
(
buf
,
strlen
(
buf
));
ASSERT_EQ
(
strlen
(
buf
),
src
.
length
());
}
#ifdef CONNECT_IN_KEEPWRITE
bthread_id_t
wait_id
;
WaitData
data
;
ASSERT_EQ
(
0
,
bthread_id_create2
(
&
wait_id
,
&
data
,
OnWaitIdReset
));
brpc
::
Socket
::
WriteOptions
wopt
;
wopt
.
id_wait
=
wait_id
;
if
(
use_my_message
)
{
ASSERT_EQ
(
0
,
s
->
Write
(
msg
,
&
wopt
));
}
else
{
ASSERT_EQ
(
0
,
s
->
Write
(
&
src
,
&
wopt
));
}
ASSERT_EQ
(
0
,
bthread_id_join
(
wait_id
));
ASSERT_EQ
(
wait_id
.
value
,
data
.
id
.
value
);
ASSERT_EQ
(
ECONNREFUSED
,
data
.
error_code
);
ASSERT_TRUE
(
butil
::
StringPiece
(
data
.
error_text
).
starts_with
(
"Fail to connect "
));
if
(
use_my_message
)
{
ASSERT_TRUE
(
appended_msg
);
}
#else
if
(
use_my_message
)
{
ASSERT_EQ
(
-
1
,
s
->
Write
(
msg
));
}
else
{
ASSERT_EQ
(
-
1
,
s
->
Write
(
&
src
));
brpc
::
ChannelOptions
options
;
options
.
protocol
=
"http"
;
options
.
max_retry
=
0
;
brpc
::
Channel
channel
;
ASSERT_EQ
(
0
,
channel
.
Init
(
"127.0.0.1:7777"
,
&
options
));
{
brpc
::
Controller
cntl
;
cntl
.
http_request
().
uri
()
=
"/"
;
channel
.
CallMethod
(
NULL
,
&
cntl
,
NULL
,
NULL
,
NULL
);
EXPECT_TRUE
(
cntl
.
Failed
());
ASSERT_EQ
(
ECONNREFUSED
,
cntl
.
ErrorCode
());
}
ASSERT_EQ
(
ECONNREFUSED
,
errno
);
#endif
ASSERT_TRUE
(
src
.
empty
());
ASSERT_EQ
(
-
1
,
s
->
fd
());
ASSERT_TRUE
(
global_sock
);
brpc
::
SocketUniquePtr
invalid_ptr
;
ASSERT_EQ
(
-
1
,
brpc
::
Socket
::
Address
(
id
,
&
invalid_ptr
));
brpc
::
Server
server
;
HealthCheckTestServiceImpl
hc_service
;
ASSERT_EQ
(
0
,
server
.
AddService
(
&
hc_service
,
brpc
::
SERVER_DOESNT_OWN_SERVICE
));
ASSERT_EQ
(
0
,
server
.
Start
(
"127.0.0.1:7777"
,
NULL
));
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
// although ::connect would succeed, the stall in hc_service makes
// the health checking rpc fail.
ASSERT_EQ
(
1
,
brpc
::
Socket
::
Status
(
id
));
brpc
::
Controller
cntl
;
cntl
.
http_request
().
uri
()
=
"/"
;
channel
.
CallMethod
(
NULL
,
&
cntl
,
NULL
,
NULL
,
NULL
);
ASSERT_EQ
(
EHOSTDOWN
,
cntl
.
ErrorCode
());
bthread_usleep
(
1000000
/*1s*/
);
}
hc_service
.
_sleep_flag
=
false
;
bthread_usleep
(
2000000
);
// recover
ASSERT_EQ
(
0
,
brpc
::
Socket
::
Status
(
id
));
// sleep so long because of the buggy impl of health check with no circuit breaker
// enabled but the sleep time is still exponentially backoff.
bthread_usleep
(
2500000
);
// should recover now
{
brpc
::
Controller
cntl
;
cntl
.
http_request
().
uri
()
=
"/"
;
channel
.
CallMethod
(
NULL
,
&
cntl
,
NULL
,
NULL
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
());
ASSERT_GT
(
cntl
.
response_attachment
().
size
(),
(
size_t
)
0
);
}
GFLAGS_NS
::
SetCommandLineOption
(
"health_check_using_rpc"
,
"false"
);
char
hc_buf
[
8
];
snprintf
(
hc_buf
,
sizeof
(
hc_buf
),
"%d"
,
old_health_check_interval
);
GFLAGS_NS
::
SetCommandLineOption
(
"health_check_interval"
,
hc_buf
);
}
TEST_F
(
SocketTest
,
health_check
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment