Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
d4022e4d
Commit
d4022e4d
authored
Sep 07, 2018
by
zyearn
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
make health checking async
parent
ccd09482
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
76 additions
and
89 deletions
+76
-89
socket.cpp
src/brpc/socket.cpp
+70
-83
socket.h
src/brpc/socket.h
+2
-2
brpc_channel_unittest.cpp
test/brpc_channel_unittest.cpp
+3
-3
brpc_naming_service_unittest.cpp
test/brpc_naming_service_unittest.cpp
+1
-1
No files found.
src/brpc/socket.cpp
View file @
d4022e4d
...
...
@@ -41,6 +41,7 @@
#include "brpc/stream_impl.h"
#include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
...
...
@@ -95,13 +96,6 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
const
int
WAIT_EPOLLOUT_TIMEOUT_MS
=
50
;
#ifdef BAIDU_INTERNAL
#define BRPC_AUXTHREAD_ATTR \
(sizeof(com_device_t) > 32*1024 ? BTHREAD_ATTR_NORMAL : BTHREAD_ATTR_SMALL)
#else
#define BRPC_AUXTHREAD_ATTR BTHREAD_ATTR_SMALL
#endif
class
BAIDU_CACHELINE_ALIGNMENT
SocketPool
{
friend
class
Socket
;
public
:
...
...
@@ -780,6 +774,16 @@ void Socket::Revive() {
}
}
class
HealthCheckTask
:
public
PeriodicTask
{
public
:
explicit
HealthCheckTask
(
SocketId
id
)
:
_id
(
id
)
,
_first_time
(
true
)
{}
bool
DoPeriodicTask
(
timespec
*
next_abstime
);
private
:
SocketId
_id
;
bool
_first_time
;
};
int
Socket
::
ReleaseAdditionalReference
()
{
bool
expect
=
false
;
// Use `relaxed' fence here since `Dereference' has `released' fence
...
...
@@ -826,10 +830,9 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
// by Channel to revive never-connected socket when server side
// comes online.
if
(
_health_check_interval_s
>
0
)
{
bthread_t
th
=
0
;
int
rc
=
bthread_start_background
(
&
th
,
&
BRPC_AUXTHREAD_ATTR
,
HealthCheckThread
,
(
void
*
)
id
());
CHECK_EQ
(
0
,
rc
);
PeriodicTaskManager
::
StartTaskAt
(
new
HealthCheckTask
(
id
()),
butil
::
milliseconds_from_now
(
_health_check_interval_s
*
500
));
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex
->
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
...
...
@@ -927,81 +930,65 @@ int Socket::Status(SocketId id, int32_t* nref) {
return
-
1
;
}
void
*
Socket
::
HealthCheckThread
(
void
*
void_arg
)
{
SocketId
socket_id
=
(
SocketId
)
void_arg
;
bool
first_time
=
true
;
if
(
bthread_usleep
(
100000
)
<
0
)
{
PLOG_IF
(
FATAL
,
errno
!=
ESTOP
)
<<
"Fail to sleep"
;
return
NULL
;
bool
HealthCheckTask
::
DoPeriodicTask
(
timespec
*
next_abstime
)
{
if
(
next_abstime
==
NULL
)
{
delete
this
;
return
true
;
}
for
(;;)
{
butil
::
EndPoint
remote_side
;
int
check_interval_s
=
0
;
do
{
SocketUniquePtr
ptr
;
const
int
rc
=
AddressFailedAsWell
(
socket_id
,
&
ptr
);
CHECK
(
rc
!=
0
);
if
(
rc
<
0
)
{
RPC_VLOG
<<
"SocketId="
<<
socket_id
<<
" was abandoned before health checking"
;
return
NULL
;
}
remote_side
=
ptr
->
remote_side
();
check_interval_s
=
ptr
->
_health_check_interval_s
;
// Note: Making a Socket re-addessable is hard. An alternative is
// creating another Socket with selected internal fields to replace
// failed Socket. Although it avoids concurrent issues with in-place
// revive, it changes SocketId: many code need to watch SocketId
// and update on change, which is impractical. Another issue with
// this method is that it has to move "selected internal fields"
// which may be accessed in parallel, not trivial to be moved.
// Finally we choose a simple-enough solution: wait until the
// reference count hits `expected_nref', which basically means no
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp), one reference is here.
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
if
(
first_time
)
{
// Only check at first time.
first_time
=
false
;
if
(
ptr
->
WaitAndReset
(
2
/*note*/
)
!=
0
)
{
LOG
(
INFO
)
<<
"Cancel checking "
<<
*
ptr
;
return
NULL
;
}
}
s_vars
->
nhealthcheck
<<
1
;
int
hc
=
0
;
if
(
ptr
->
_user
)
{
hc
=
ptr
->
_user
->
CheckHealth
(
ptr
.
get
());
}
else
{
hc
=
ptr
->
CheckHealth
();
}
if
(
hc
==
0
)
{
if
(
ptr
->
CreatedByConnect
())
{
s_vars
->
channel_conn
<<
-
1
;
}
ptr
->
Revive
();
ptr
->
_hc_count
=
0
;
return
NULL
;
}
else
if
(
hc
==
ESTOP
)
{
LOG
(
INFO
)
<<
"Cancel checking "
<<
*
ptr
;
return
NULL
;
}
++
ptr
->
_hc_count
;
}
while
(
0
);
CHECK_GT
(
check_interval_s
,
0
);
if
(
bthread_usleep
(
check_interval_s
*
1000000L
)
<
0
)
{
PLOG_IF
(
FATAL
,
errno
!=
ESTOP
)
<<
"Fail to sleep"
;
LOG
(
INFO
)
<<
"Cancel checking SocketId="
<<
socket_id
<<
'@'
<<
remote_side
;
return
NULL
;
SocketUniquePtr
ptr
;
const
int
rc
=
Socket
::
AddressFailedAsWell
(
_id
,
&
ptr
);
CHECK
(
rc
!=
0
);
if
(
rc
<
0
)
{
RPC_VLOG
<<
"SocketId="
<<
_id
<<
" was abandoned before health checking"
;
return
false
;
}
// Note: Making a Socket re-addessable is hard. An alternative is
// creating another Socket with selected internal fields to replace
// failed Socket. Although it avoids concurrent issues with in-place
// revive, it changes SocketId: many code need to watch SocketId
// and update on change, which is impractical. Another issue with
// this method is that it has to move "selected internal fields"
// which may be accessed in parallel, not trivial to be moved.
// Finally we choose a simple-enough solution: wait until the
// reference count hits `expected_nref', which basically means no
// one is addressing the Socket(except here). Because the Socket
// is not addressable, the reference count will not increase
// again. This solution is not perfect because the `expected_nref'
// is implementation specific. In our case, one reference comes
// from SocketMapInsert(socket_map.cpp), one reference is here.
// Although WaitAndReset() could hang when someone is addressing
// the failed Socket forever (also indicating bug), this is not an
// issue in current code.
if
(
_first_time
)
{
// Only check at first time.
_first_time
=
false
;
if
(
ptr
->
WaitAndReset
(
2
/*note*/
)
!=
0
)
{
LOG
(
INFO
)
<<
"Cancel checking "
<<
*
ptr
;
return
false
;
}
}
s_vars
->
nhealthcheck
<<
1
;
int
hc
=
0
;
if
(
ptr
->
_user
)
{
hc
=
ptr
->
_user
->
CheckHealth
(
ptr
.
get
());
}
else
{
hc
=
ptr
->
CheckHealth
();
}
if
(
hc
==
0
)
{
if
(
ptr
->
CreatedByConnect
())
{
s_vars
->
channel_conn
<<
-
1
;
}
ptr
->
Revive
();
ptr
->
_hc_count
=
0
;
return
false
;
}
else
if
(
hc
==
ESTOP
)
{
LOG
(
INFO
)
<<
"Cancel checking "
<<
*
ptr
;
return
false
;
}
++
ptr
->
_hc_count
;
*
next_abstime
=
butil
::
seconds_from_now
(
ptr
->
_health_check_interval_s
);
return
true
;
}
void
Socket
::
OnRecycle
()
{
...
...
src/brpc/socket.h
View file @
d4022e4d
...
...
@@ -178,6 +178,7 @@ friend class Controller;
friend
class
policy
::
ConsistentHashingLoadBalancer
;
friend
class
policy
::
RtmpContext
;
friend
class
schan
::
ChannelBalancer
;
friend
class
HealthCheckTask
;
class
SharedPart
;
struct
Forbidden
{};
struct
WriteRequest
;
...
...
@@ -518,7 +519,6 @@ friend void DereferenceSocket(Socket*);
int
ConnectIfNot
(
const
timespec
*
abstime
,
WriteRequest
*
req
);
int
ResetFileDescriptor
(
int
fd
);
static
void
*
HealthCheckThread
(
void
*
);
// Returns 0 on success, 1 on failed socket, -1 on recycled.
static
int
AddressFailedAsWell
(
SocketId
id
,
SocketUniquePtr
*
ptr
);
...
...
@@ -663,7 +663,7 @@ private:
int
_preferred_index
;
// Number of HC since the last SetFailed() was called. Set to 0 when the
// socket is revived. Only set in HealthCheckT
hread
// socket is revived. Only set in HealthCheckT
ask::DoPeriodicTask()
int
_hc_count
;
// Size of current incomplete message, set to 0 on complete.
...
...
test/brpc_channel_unittest.cpp
View file @
d4022e4d
...
...
@@ -36,7 +36,7 @@ namespace policy {
void
SendRpcResponse
(
int64_t
correlation_id
,
Controller
*
cntl
,
const
google
::
protobuf
::
Message
*
req
,
const
google
::
protobuf
::
Message
*
res
,
const
Server
*
server_raw
,
MethodStatus
*
,
long
);
const
Server
*
server_raw
,
MethodStatus
*
,
int64_t
);
}
// policy
}
// brpc
...
...
@@ -230,7 +230,7 @@ protected:
const
google
::
protobuf
::
Message
*
,
const
google
::
protobuf
::
Message
*
,
const
brpc
::
Server
*
,
brpc
::
MethodStatus
*
,
long
>
(
brpc
::
MethodStatus
*
,
int64_t
>
(
&
brpc
::
policy
::
SendRpcResponse
,
meta
.
correlation_id
(),
cntl
,
NULL
,
res
,
&
ts
->
_dummy
,
NULL
,
-
1
);
...
...
@@ -702,7 +702,7 @@ protected:
CallMethod
(
&
subchans
[
0
],
&
cntl
,
&
req
,
&
res
,
false
);
ASSERT_TRUE
(
cntl
.
Failed
());
ASSERT_EQ
(
brpc
::
EINTERNAL
,
cntl
.
ErrorCode
())
<<
cntl
.
ErrorText
();
ASSERT_
EQ
(
"[E2001][127.0.1.1:0]Method ComboEcho() not implemented."
,
cntl
.
ErrorText
(
));
ASSERT_
TRUE
(
butil
::
StringPiece
(
cntl
.
ErrorText
()).
ends_with
(
"Method ComboEcho() not implemented."
));
// do the rpc call.
cntl
.
Reset
();
...
...
test/brpc_naming_service_unittest.cpp
View file @
d4022e4d
...
...
@@ -394,7 +394,7 @@ TEST(NamingServiceTest, consul_with_backup_file) {
restful_map
.
c_str
()));
ASSERT_EQ
(
0
,
server
.
Start
(
"localhost:8500"
,
NULL
));
bthread_usleep
(
1
000000
);
bthread_usleep
(
2
000000
);
butil
::
EndPoint
n1
;
ASSERT_EQ
(
0
,
butil
::
str2endpoint
(
"10.121.36.189:8003"
,
&
n1
));
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment