Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
62d0abd0
Commit
62d0abd0
authored
Sep 06, 2018
by
gejun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
SSL supports NS & connection_group support
parent
0f621581
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
322 additions
and
266 deletions
+322
-266
client.cpp
example/multi_threaded_echo_c++/client.cpp
+1
-1
server.cpp
example/multi_threaded_echo_c++/server.cpp
+3
-2
acceptor.cpp
src/brpc/acceptor.cpp
+3
-3
acceptor.h
src/brpc/acceptor.h
+3
-3
connections_service.cpp
src/brpc/builtin/connections_service.cpp
+7
-4
channel.cpp
src/brpc/channel.cpp
+103
-10
channel.h
src/brpc/channel.h
+9
-2
naming_service_thread.cpp
src/brpc/details/naming_service_thread.cpp
+22
-17
naming_service_thread.h
src/brpc/details/naming_service_thread.h
+7
-4
ssl_helper.cpp
src/brpc/details/ssl_helper.cpp
+39
-28
ssl_helper.h
src/brpc/details/ssl_helper.h
+5
-6
rtmp.cpp
src/brpc/rtmp.cpp
+2
-2
server.cpp
src/brpc/server.cpp
+40
-36
server.h
src/brpc/server.h
+5
-9
socket.cpp
src/brpc/socket.cpp
+0
-0
socket.h
src/brpc/socket.h
+9
-4
socket_inl.h
src/brpc/socket_inl.h
+0
-2
socket_map.cpp
src/brpc/socket_map.cpp
+15
-83
socket_map.h
src/brpc/socket_map.h
+48
-44
ssl_option.cpp
src/brpc/ssl_option.cpp
+1
-2
ssl_option.h
src/brpc/ssl_option.h
+0
-4
No files found.
example/multi_threaded_echo_c++/client.cpp
View file @
62d0abd0
...
...
@@ -95,7 +95,7 @@ int main(int argc, char* argv[]) {
// Initialize the channel, NULL means using default options.
brpc
::
ChannelOptions
options
;
options
.
ssl_options
.
enable
=
FLAGS_enable_ssl
;
options
.
ssl_options
=
std
::
make_shared
<
brpc
::
ChannelSSLOptions
>
()
;
options
.
protocol
=
FLAGS_protocol
;
options
.
connection_type
=
FLAGS_connection_type
;
options
.
connect_timeout_ms
=
std
::
min
(
FLAGS_timeout_ms
/
2
,
100
);
...
...
example/multi_threaded_echo_c++/server.cpp
View file @
62d0abd0
...
...
@@ -82,8 +82,9 @@ int main(int argc, char* argv[]) {
// Start the server.
brpc
::
ServerOptions
options
;
options
.
ssl_options
.
default_cert
.
certificate
=
"cert.pem"
;
options
.
ssl_options
.
default_cert
.
private_key
=
"key.pem"
;
options
.
ssl_options
=
std
::
make_shared
<
brpc
::
ServerSSLOptions
>
();
options
.
ssl_options
->
default_cert
.
certificate
=
"cert.pem"
;
options
.
ssl_options
->
default_cert
.
private_key
=
"key.pem"
;
options
.
idle_timeout_sec
=
FLAGS_idle_timeout_s
;
options
.
max_concurrency
=
FLAGS_max_concurrency
;
options
.
internal_port
=
FLAGS_internal_port
;
...
...
src/brpc/acceptor.cpp
View file @
62d0abd0
...
...
@@ -44,8 +44,8 @@ Acceptor::~Acceptor() {
Join
();
}
int
Acceptor
::
StartAccept
(
int
listened_fd
,
int
idle_timeout_sec
,
SSL_CTX
*
ssl_ctx
)
{
int
Acceptor
::
StartAccept
(
int
listened_fd
,
int
idle_timeout_sec
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
)
{
if
(
listened_fd
<
0
)
{
LOG
(
FATAL
)
<<
"Invalid listened_fd="
<<
listened_fd
;
return
-
1
;
...
...
@@ -271,7 +271,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
options
.
remote_side
=
butil
::
EndPoint
(
*
(
sockaddr_in
*
)
&
in_addr
);
options
.
user
=
acception
->
user
();
options
.
on_edge_triggered_events
=
InputMessenger
::
OnNewMessages
;
options
.
ssl_ctx
=
am
->
_ssl_ctx
;
options
.
initial_
ssl_ctx
=
am
->
_ssl_ctx
;
if
(
Socket
::
Create
(
options
,
&
socket_id
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to create Socket"
;
continue
;
...
...
src/brpc/acceptor.h
View file @
62d0abd0
...
...
@@ -53,7 +53,8 @@ public:
// transmission for `idle_timeout_sec' will be closed automatically iff
// `idle_timeout_sec' > 0
// Return 0 on success, -1 otherwise.
int
StartAccept
(
int
listened_fd
,
int
idle_timeout_sec
,
SSL_CTX
*
ssl_ctx
);
int
StartAccept
(
int
listened_fd
,
int
idle_timeout_sec
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
);
// [thread-safe] Stop accepting connections.
// `closewait_ms' is not used anymore.
...
...
@@ -104,8 +105,7 @@ private:
// The map containing all the accepted sockets
SocketMap
_socket_map
;
// Not owner
SSL_CTX
*
_ssl_ctx
;
std
::
shared_ptr
<
SocketSSLContext
>
_ssl_ctx
;
};
}
// namespace brpc
...
...
src/brpc/builtin/connections_service.cpp
View file @
62d0abd0
...
...
@@ -192,10 +192,13 @@ void ConnectionsService::PrintConnections(
// slow (because we have many connections here).
int
pref_index
=
ptr
->
preferred_index
();
SocketUniquePtr
first_sub
;
int
pooled_count
=
-
1
;
if
(
ptr
->
fd
()
<
0
)
{
int
numfree
=
0
;
int
numinflight
=
0
;
if
(
ptr
->
fd
()
<
0
)
{
ptr
->
GetPooledSocketStats
(
&
numfree
,
&
numinflight
);
if
(
ptr
->
GetPooledSocketStats
(
&
numfree
,
&
numinflight
))
{
pooled_count
=
numfree
+
numinflight
;
}
// Check preferred_index of any pooled sockets.
ptr
->
ListPooledSockets
(
&
first_id
,
1
);
if
(
!
first_id
.
empty
())
{
...
...
@@ -263,11 +266,11 @@ void ConnectionsService::PrintConnections(
}
os
<<
SSLStateToYesNo
(
ptr
->
ssl_state
(),
use_html
)
<<
bar
;
char
protname
[
32
];
if
(
!
ptr
->
CreatedByConnect
()
)
{
if
(
pooled_count
<
0
)
{
snprintf
(
protname
,
sizeof
(
protname
),
"%s"
,
pref_prot
);
}
else
{
snprintf
(
protname
,
sizeof
(
protname
),
"%s*%d"
,
pref_prot
,
numfree
+
numinfligh
t
);
pooled_coun
t
);
}
os
<<
min_width
(
protname
,
12
)
<<
bar
;
if
(
ptr
->
fd
()
>=
0
)
{
...
...
src/brpc/channel.cpp
View file @
62d0abd0
...
...
@@ -21,6 +21,7 @@
#include <gflags/gflags.h>
#include "butil/time.h" // milliseconds_from_now
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "bthread/unstable.h" // bthread_timer_add
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/compress.h"
...
...
@@ -32,7 +33,6 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/policy/esp_authenticator.h"
namespace
brpc
{
DECLARE_bool
(
enable_rpcz
);
...
...
@@ -50,8 +50,73 @@ ChannelOptions::ChannelOptions()
,
auth
(
NULL
)
,
retry_policy
(
NULL
)
,
ns_filter
(
NULL
)
,
connection_group
(
0
)
{}
static
ChannelSignature
ComputeChannelSignature
(
const
ChannelOptions
&
opt
)
{
if
(
opt
.
auth
==
NULL
&&
opt
.
ssl_options
==
NULL
&&
opt
.
connection_group
==
0
)
{
// Returning zeroized result by default is more intuitive for users.
return
ChannelSignature
();
}
uint32_t
seed
=
0
;
std
::
string
buf
;
buf
.
reserve
(
1024
);
butil
::
MurmurHash3_x64_128_Context
mm_ctx
;
do
{
buf
.
clear
();
butil
::
MurmurHash3_x64_128_Init
(
&
mm_ctx
,
seed
);
if
(
opt
.
connection_group
)
{
buf
.
append
(
"|conng="
);
buf
.
append
((
char
*
)
&
opt
.
connection_group
,
sizeof
(
opt
.
connection_group
));
}
if
(
opt
.
auth
)
{
buf
.
append
(
"|auth="
);
buf
.
append
((
char
*
)
&
opt
.
auth
,
sizeof
(
opt
.
auth
));
}
const
ChannelSSLOptions
*
ssl
=
opt
.
ssl_options
.
get
();
if
(
ssl
)
{
buf
.
push_back
(
'|'
);
buf
.
append
(
ssl
->
ciphers
);
buf
.
push_back
(
'|'
);
buf
.
append
(
ssl
->
protocols
);
buf
.
push_back
(
'|'
);
buf
.
append
(
ssl
->
sni_name
);
const
VerifyOptions
&
verify
=
ssl
->
verify
;
buf
.
push_back
(
'|'
);
buf
.
append
((
char
*
)
&
verify
.
verify_depth
,
sizeof
(
verify
.
verify_depth
));
buf
.
push_back
(
'|'
);
buf
.
append
(
verify
.
ca_file_path
);
}
else
{
// All disabled ChannelSSLOptions are the same
}
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
buf
.
data
(),
buf
.
size
());
buf
.
clear
();
if
(
ssl
)
{
const
CertInfo
&
cert
=
ssl
->
client_cert
;
if
(
!
cert
.
certificate
.
empty
())
{
// Certificate may be too long (PEM string) to fit into `buf'
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
cert
.
certificate
.
data
(),
cert
.
certificate
.
size
());
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
cert
.
private_key
.
data
(),
cert
.
private_key
.
size
());
}
}
// sni_filters has no effect in ChannelSSLOptions
ChannelSignature
result
;
butil
::
MurmurHash3_x64_128_Final
(
result
.
data
,
&
mm_ctx
);
if
(
result
!=
ChannelSignature
())
{
// the empty result is reserved for default case and cannot
// be used, increment the seed and retry.
return
result
;
}
++
seed
;
}
while
(
true
);
}
Channel
::
Channel
(
ProfilerLinker
)
:
_server_id
((
SocketId
)
-
1
)
,
_serialize_request
(
NULL
)
...
...
@@ -62,9 +127,8 @@ Channel::Channel(ProfilerLinker)
Channel
::~
Channel
()
{
if
(
_server_id
!=
(
SocketId
)
-
1
)
{
SocketMapRemove
(
SocketMapKey
(
_server_address
,
_options
.
ssl_options
,
_options
.
auth
));
const
ChannelSignature
sig
=
ComputeChannelSignature
(
_options
);
SocketMapRemove
(
SocketMapKey
(
_server_address
,
sig
));
}
}
...
...
@@ -125,11 +189,13 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
}
}
else
if
(
_options
.
protocol
==
brpc
::
PROTOCOL_HTTP
)
{
if
(
_raw_server_address
.
compare
(
0
,
5
,
"https"
)
==
0
)
{
_options
.
ssl_options
.
enable
=
true
;
if
(
_options
.
ssl_options
.
sni_name
.
empty
())
{
if
(
_options
.
ssl_options
==
NULL
)
{
_options
.
ssl_options
=
std
::
make_shared
<
ChannelSSLOptions
>
();
}
if
(
_options
.
ssl_options
->
sni_name
.
empty
())
{
int
port
;
ParseHostAndPortFromURL
(
_raw_server_address
.
c_str
(),
&
_options
.
ssl_options
.
sni_name
,
&
port
);
&
_options
.
ssl_options
->
sni_name
,
&
port
);
}
}
}
...
...
@@ -190,6 +256,26 @@ int Channel::Init(const char* server_addr, int port,
return
Init
(
point
,
options
);
}
static
int
CreateSocketSSLContext
(
const
ChannelOptions
&
options
,
ChannelSignature
*
sig
,
std
::
shared_ptr
<
SocketSSLContext
>*
ssl_ctx
)
{
if
(
options
.
ssl_options
!=
NULL
)
{
*
sig
=
ComputeChannelSignature
(
options
);
SSL_CTX
*
raw_ctx
=
CreateClientSSLContext
(
*
options
.
ssl_options
);
if
(
!
raw_ctx
)
{
LOG
(
ERROR
)
<<
"Fail to CreateClientSSLContext"
;
return
-
1
;
}
*
ssl_ctx
=
std
::
make_shared
<
SocketSSLContext
>
();
(
*
ssl_ctx
)
->
raw_ctx
=
raw_ctx
;
(
*
ssl_ctx
)
->
sni_name
=
options
.
ssl_options
->
sni_name
;
}
else
{
sig
->
Reset
();
(
*
ssl_ctx
)
=
NULL
;
}
return
0
;
}
int
Channel
::
Init
(
butil
::
EndPoint
server_addr_and_port
,
const
ChannelOptions
*
options
)
{
GlobalInitializeOrDie
();
...
...
@@ -202,9 +288,13 @@ int Channel::Init(butil::EndPoint server_addr_and_port,
return
-
1
;
}
_server_address
=
server_addr_and_port
;
if
(
SocketMapInsert
(
SocketMapKey
(
server_addr_and_port
,
_options
.
ssl_options
,
_options
.
auth
),
&
_server_id
)
!=
0
)
{
ChannelSignature
sig
;
std
::
shared_ptr
<
SocketSSLContext
>
ssl_ctx
;
if
(
CreateSocketSSLContext
(
_options
,
&
sig
,
&
ssl_ctx
)
!=
0
)
{
return
-
1
;
}
if
(
SocketMapInsert
(
SocketMapKey
(
server_addr_and_port
,
sig
),
&
_server_id
,
ssl_ctx
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to insert into SocketMap"
;
return
-
1
;
}
...
...
@@ -230,6 +320,9 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions
ns_opt
;
ns_opt
.
succeed_without_server
=
_options
.
succeed_without_server
;
ns_opt
.
log_succeed_without_server
=
_options
.
log_succeed_without_server
;
if
(
CreateSocketSSLContext
(
_options
,
&
ns_opt
.
channel_signature
,
&
ns_opt
.
ssl_ctx
)
!=
0
)
{
return
-
1
;
}
if
(
lb
->
Init
(
ns_url
,
lb_name
,
_options
.
ns_filter
,
&
ns_opt
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to initialize LoadBalancerWithNaming"
;
delete
lb
;
...
...
src/brpc/channel.h
View file @
62d0abd0
...
...
@@ -90,7 +90,7 @@ struct ChannelOptions {
bool
log_succeed_without_server
;
// SSL related options. Refer to `ChannelSSLOptions' for details
ChannelSSLOptions
ssl_options
;
std
::
shared_ptr
<
ChannelSSLOptions
>
ssl_options
;
// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
...
...
@@ -99,9 +99,10 @@ struct ChannelOptions {
const
Authenticator
*
auth
;
// Customize the error code that should be retried. The interface is
// defined src/brpc/retry_policy.h
// defined
in
src/brpc/retry_policy.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
const
RetryPolicy
*
retry_policy
;
// Filter ServerNodes (i.e. based on `tag' field of `ServerNode')
...
...
@@ -109,7 +110,13 @@ struct ChannelOptions {
// in src/brpc/naming_service_filter.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
const
NamingServiceFilter
*
ns_filter
;
// Channels with same connection_group share connections. In an another
// word, set to a different value to not share connections.
// Default: 0
int
connection_group
;
};
// A Channel represents a communication line to one server or multiple servers
...
...
src/brpc/details/naming_service_thread.cpp
View file @
62d0abd0
...
...
@@ -28,17 +28,18 @@
namespace
brpc
{
struct
NSKey
{
const
NamingService
*
ns
;
std
::
string
protocol
;
std
::
string
service_name
;
};
struct
NSKeyHasher
{
size_t
operator
()(
const
NSKey
&
nskey
)
const
{
return
butil
::
DefaultHasher
<
std
::
string
>
()(
nskey
.
service_name
)
*
101
+
(
uintptr_t
)
nskey
.
ns
;
*
101
+
butil
::
DefaultHasher
<
std
::
string
>
()(
nskey
.
protocol
)
;
}
};
inline
bool
operator
==
(
const
NSKey
&
k1
,
const
NSKey
&
k2
)
{
return
(
k1
.
ns
==
k2
.
ns
&&
k1
.
service_name
==
k2
.
service_name
);
return
k1
.
protocol
==
k2
.
protocol
&&
k1
.
service_name
==
k2
.
service_name
;
}
typedef
butil
::
FlatMap
<
NSKey
,
NamingServiceThread
*
,
NSKeyHasher
>
NamingServiceMap
;
...
...
@@ -58,7 +59,8 @@ NamingServiceThread::Actions::~Actions() {
// Remove all sockets from SocketMap
for
(
std
::
vector
<
ServerNode
>::
const_iterator
it
=
_last_servers
.
begin
();
it
!=
_last_servers
.
end
();
++
it
)
{
SocketMapRemove
(
SocketMapKey
(
it
->
addr
));
const
SocketMapKey
key
(
it
->
addr
,
_owner
->
_options
.
channel_signature
);
SocketMapRemove
(
key
);
}
EndWait
(
0
);
}
...
...
@@ -110,7 +112,8 @@ void NamingServiceThread::Actions::ResetServers(
// TODO: For each unique SocketMapKey (i.e. SSL settings), insert a new
// Socket. SocketMapKey may be passed through AddWatcher. Make sure
// to pick those Sockets with the right settings during OnAddedServers
CHECK_EQ
(
SocketMapInsert
(
SocketMapKey
(
_added
[
i
].
addr
),
&
tagged_id
.
id
),
0
);
const
SocketMapKey
key
(
_added
[
i
].
addr
,
_owner
->
_options
.
channel_signature
);
CHECK_EQ
(
0
,
SocketMapInsert
(
key
,
&
tagged_id
.
id
,
_owner
->
_options
.
ssl_ctx
));
_added_sockets
.
push_back
(
tagged_id
);
}
...
...
@@ -118,7 +121,8 @@ void NamingServiceThread::Actions::ResetServers(
for
(
size_t
i
=
0
;
i
<
_removed
.
size
();
++
i
)
{
ServerNodeWithId
tagged_id
;
tagged_id
.
node
=
_removed
[
i
];
CHECK_EQ
(
0
,
SocketMapFind
(
SocketMapKey
(
_removed
[
i
].
addr
),
&
tagged_id
.
id
));
const
SocketMapKey
key
(
_removed
[
i
].
addr
,
_owner
->
_options
.
channel_signature
);
CHECK_EQ
(
0
,
SocketMapFind
(
key
,
&
tagged_id
.
id
));
_removed_sockets
.
push_back
(
tagged_id
);
}
...
...
@@ -169,7 +173,8 @@ void NamingServiceThread::Actions::ResetServers(
for
(
size_t
i
=
0
;
i
<
_removed
.
size
();
++
i
)
{
// TODO: Remove all Sockets that have the same address in SocketMapKey.peer
// We may need another data structure to avoid linear cost
SocketMapRemove
(
SocketMapKey
(
_removed
[
i
].
addr
));
const
SocketMapKey
key
(
_removed
[
i
].
addr
,
_owner
->
_options
.
channel_signature
);
SocketMapRemove
(
key
);
}
if
(
!
_removed
.
empty
()
||
!
_added
.
empty
())
{
...
...
@@ -207,7 +212,6 @@ int NamingServiceThread::Actions::WaitForFirstBatchOfServers() {
NamingServiceThread
::
NamingServiceThread
()
:
_tid
(
0
)
,
_source_ns
(
NULL
)
,
_ns
(
NULL
)
,
_actions
(
this
)
{
}
...
...
@@ -215,8 +219,8 @@ NamingServiceThread::NamingServiceThread()
NamingServiceThread
::~
NamingServiceThread
()
{
RPC_VLOG
<<
"~NamingServiceThread("
<<
*
this
<<
')'
;
// Remove from g_nsthread_map first
if
(
_source_ns
!=
NULL
)
{
const
NSKey
key
=
{
_
source_ns
,
_service_name
};
if
(
!
_protocol
.
empty
()
)
{
const
NSKey
key
=
{
_
protocol
,
_service_name
};
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
g_nsthread_map_mutex
);
if
(
g_nsthread_map
!=
NULL
)
{
NamingServiceThread
**
ptr
=
g_nsthread_map
->
seek
(
key
);
...
...
@@ -255,15 +259,16 @@ void* NamingServiceThread::RunThis(void* arg) {
return
NULL
;
}
int
NamingServiceThread
::
Start
(
const
NamingService
*
naming_service
,
int
NamingServiceThread
::
Start
(
NamingService
*
naming_service
,
const
std
::
string
&
protocol
,
const
std
::
string
&
service_name
,
const
GetNamingServiceThreadOptions
*
opt_in
)
{
if
(
naming_service
==
NULL
)
{
LOG
(
ERROR
)
<<
"Param[naming_service] is NULL"
;
return
-
1
;
}
_
source_
ns
=
naming_service
;
_
ns
=
naming_service
->
New
()
;
_ns
=
naming_service
;
_
protocol
=
protocol
;
_service_name
=
service_name
;
if
(
opt_in
)
{
_options
=
*
opt_in
;
...
...
@@ -400,13 +405,13 @@ int GetNamingServiceThread(
LOG
(
ERROR
)
<<
"Invalid naming service url="
<<
url
;
return
-
1
;
}
const
NamingService
*
ns
=
NamingServiceExtension
()
->
Find
(
protocol
);
if
(
ns
==
NULL
)
{
const
NamingService
*
source_
ns
=
NamingServiceExtension
()
->
Find
(
protocol
);
if
(
source_
ns
==
NULL
)
{
LOG
(
ERROR
)
<<
"Unknown protocol="
<<
protocol
;
return
-
1
;
}
NSKey
key
;
key
.
ns
=
ns
;
key
.
protocol
=
protocol
;
key
.
service_name
=
service_name
;
bool
new_thread
=
false
;
butil
::
intrusive_ptr
<
NamingServiceThread
>
nsthread
;
...
...
@@ -452,7 +457,7 @@ int GetNamingServiceThread(
}
}
if
(
new_thread
)
{
if
(
nsthread
->
Start
(
ns
,
key
.
service_name
,
options
)
!=
0
)
{
if
(
nsthread
->
Start
(
source_ns
->
New
(),
key
.
protocol
,
key
.
service_name
,
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to start NamingServiceThread"
;
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
g_nsthread_map_mutex
);
g_nsthread_map
->
erase
(
key
);
...
...
src/brpc/details/naming_service_thread.h
View file @
62d0abd0
...
...
@@ -24,7 +24,7 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/naming_service.h" // NamingService
#include "brpc/naming_service_filter.h" // NamingServiceFilter
#include "brpc/socket_map.h"
namespace
brpc
{
...
...
@@ -46,6 +46,8 @@ struct GetNamingServiceThreadOptions {
bool
succeed_without_server
;
bool
log_succeed_without_server
;
ChannelSignature
channel_signature
;
std
::
shared_ptr
<
SocketSSLContext
>
ssl_ctx
;
};
// A dedicated thread to map a name to ServerIds
...
...
@@ -86,7 +88,9 @@ public:
NamingServiceThread
();
~
NamingServiceThread
();
int
Start
(
const
NamingService
*
ns
,
const
std
::
string
&
service_name
,
int
Start
(
NamingService
*
ns
,
const
std
::
string
&
protocol
,
const
std
::
string
&
service_name
,
const
GetNamingServiceThreadOptions
*
options
);
int
WaitForFirstBatchOfServers
();
...
...
@@ -106,9 +110,8 @@ private:
butil
::
Mutex
_mutex
;
bthread_t
_tid
;
// TODO: better use a name.
const
NamingService
*
_source_ns
;
NamingService
*
_ns
;
std
::
string
_protocol
;
std
::
string
_service_name
;
GetNamingServiceThreadOptions
_options
;
std
::
vector
<
ServerNodeWithId
>
_last_sockets
;
...
...
src/brpc/details/ssl_helper.cpp
View file @
62d0abd0
...
...
@@ -437,10 +437,6 @@ static int SetSSLOptions(SSL_CTX* ctx, const std::string& ciphers,
}
SSL_CTX
*
CreateClientSSLContext
(
const
ChannelSSLOptions
&
options
)
{
if
(
!
options
.
enable
)
{
return
NULL
;
}
std
::
unique_ptr
<
SSL_CTX
,
FreeSSLCTX
>
ssl_ctx
(
SSL_CTX_new
(
SSLv23_client_method
()));
if
(
!
ssl_ctx
)
{
...
...
@@ -770,51 +766,66 @@ int SSLDHInit() {
return
0
;
}
}
// namespace brpc
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
SSL
*
ssl
)
{
os
<<
"[SSL HANDSHAKE]"
<<
"
\n
* cipher: "
<<
SSL_get_cipher
(
ssl
)
<<
"
\n
* protocol: "
<<
SSL_get_version
(
ssl
)
<<
"
\n
* verify: "
<<
(
SSL_get_verify_mode
(
ssl
)
&
SSL_VERIFY_PEER
?
"success"
:
"none"
)
<<
"
\n
"
;
static
std
::
string
GetNextLevelSeparator
(
const
char
*
sep
)
{
if
(
sep
[
0
]
!=
'\n'
)
{
return
sep
;
}
const
size_t
left_len
=
strlen
(
sep
+
1
);
if
(
left_len
==
0
)
{
return
"
\n
"
;
}
std
::
string
new_sep
;
new_sep
.
reserve
(
left_len
*
2
+
1
);
new_sep
.
append
(
sep
,
left_len
+
1
);
new_sep
.
append
(
sep
+
1
,
left_len
);
return
new_sep
;
}
void
Print
(
std
::
ostream
&
os
,
SSL
*
ssl
,
const
char
*
sep
)
{
os
<<
"cipher="
<<
SSL_get_cipher
(
ssl
)
<<
sep
<<
"protocol="
<<
SSL_get_version
(
ssl
)
<<
sep
<<
"verify="
<<
(
SSL_get_verify_mode
(
ssl
)
&
SSL_VERIFY_PEER
?
"success"
:
"none"
);
X509
*
cert
=
SSL_get_peer_certificate
(
ssl
);
if
(
cert
)
{
os
<<
"
\n
"
<<
cert
;
os
<<
sep
<<
"peer_certificate={"
;
const
std
::
string
new_sep
=
GetNextLevelSeparator
(
sep
);
if
(
sep
[
0
]
==
'\n'
)
{
os
<<
new_sep
;
}
Print
(
os
,
cert
,
new_sep
.
c_str
());
if
(
sep
[
0
]
==
'\n'
)
{
os
<<
sep
;
}
os
<<
'}'
;
}
return
os
;
}
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
X509
*
cert
)
{
void
Print
(
std
::
ostream
&
os
,
X509
*
cert
,
const
char
*
sep
)
{
BIO
*
buf
=
BIO_new
(
BIO_s_mem
());
if
(
buf
==
NULL
)
{
return
os
;
return
;
}
BIO_printf
(
buf
,
"[CERTIFICATE]"
);
BIO_printf
(
buf
,
"
\n
* subject: "
);
BIO_printf
(
buf
,
"subject="
);
X509_NAME_print
(
buf
,
X509_get_subject_name
(
cert
),
0
);
BIO_printf
(
buf
,
"
\n
* start date: "
);
BIO_printf
(
buf
,
"
%sstart_date="
,
sep
);
ASN1_TIME_print
(
buf
,
X509_get_notBefore
(
cert
));
BIO_printf
(
buf
,
"
\n
* expire date: "
);
BIO_printf
(
buf
,
"
%sexpire_date="
,
sep
);
ASN1_TIME_print
(
buf
,
X509_get_notAfter
(
cert
));
BIO_printf
(
buf
,
"
\n
* common name: "
);
BIO_printf
(
buf
,
"
%scommon_name="
,
sep
);
std
::
vector
<
std
::
string
>
hostnames
;
brpc
::
ExtractHostnames
(
cert
,
&
hostnames
);
for
(
size_t
i
=
0
;
i
<
hostnames
.
size
();
++
i
)
{
BIO_printf
(
buf
,
"%s;
"
,
hostnames
[
i
].
c_str
());
BIO_printf
(
buf
,
"%s;"
,
hostnames
[
i
].
c_str
());
}
BIO_printf
(
buf
,
"
\n
* issuer: "
);
BIO_printf
(
buf
,
"
%sissuer="
,
sep
);
X509_NAME_print
(
buf
,
X509_get_issuer_name
(
cert
),
0
);
BIO_printf
(
buf
,
"
\n
"
);
char
*
bufp
=
NULL
;
int
len
=
BIO_get_mem_data
(
buf
,
&
bufp
);
os
<<
butil
::
StringPiece
(
bufp
,
len
);
return
os
;
}
}
// namespace brpc
src/brpc/details/ssl_helper.h
View file @
62d0abd0
...
...
@@ -22,8 +22,7 @@
// For some versions of openssl, SSL_* are defined inside this header
#include <openssl/ossl_typ.h>
#include "brpc/socket_id.h" // SocketId
#include "brpc/ssl_option.h" // SSLOptions
#include "brpc/ssl_option.h" // ServerSSLOptions
namespace
brpc
{
...
...
@@ -76,7 +75,7 @@ SSL_CTX* CreateClientSSLContext(const ChannelSSLOptions& options);
// fields into `hostnames'
SSL_CTX
*
CreateServerSSLContext
(
const
std
::
string
&
certificate_file
,
const
std
::
string
&
private_key_file
,
const
SSLOptions
&
options
,
const
S
erverS
SLOptions
&
options
,
std
::
vector
<
std
::
string
>*
hostnames
);
// Create a new SSL (per connection object) using configurations in `ctx'.
...
...
@@ -92,9 +91,9 @@ void AddBIOBuffer(SSL* ssl, int fd, int bufsize);
// set to indicate the reason (0 for EOF)
SSLState
DetectSSLState
(
int
fd
,
int
*
error_code
);
}
// namespace brpc
void
Print
(
std
::
ostream
&
os
,
SSL
*
ssl
,
const
char
*
sep
);
void
Print
(
std
::
ostream
&
os
,
X509
*
cert
,
const
char
*
sep
);
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
SSL
*
ssl
);
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
X509
*
cert
);
}
// namespace brpc
#endif // BRPC_SSL_HELPER_H
src/brpc/rtmp.cpp
View file @
62d0abd0
...
...
@@ -1683,7 +1683,7 @@ void RtmpClientStream::CleanupSocketForStream(
Socket
*
prev_sock
,
Controller
*
,
int
/*error_code*/
)
{
if
(
prev_sock
)
{
if
(
_from_socketmap
)
{
_client_impl
->
socket_map
().
Remove
(
prev_sock
->
remote_side
(
),
_client_impl
->
socket_map
().
Remove
(
SocketMapKey
(
prev_sock
->
remote_side
()
),
prev_sock
->
id
());
}
else
{
prev_sock
->
SetFailed
();
// not necessary, already failed.
...
...
@@ -1888,7 +1888,7 @@ void RtmpClientStream::OnStopInternal() {
LOG
(
FATAL
)
<<
"RtmpContext of "
<<
*
_rtmpsock
<<
" is NULL"
;
}
if
(
_from_socketmap
)
{
_client_impl
->
socket_map
().
Remove
(
_rtmpsock
->
remote_side
(
),
_client_impl
->
socket_map
().
Remove
(
SocketMapKey
(
_rtmpsock
->
remote_side
()
),
_rtmpsock
->
id
());
}
else
{
_rtmpsock
->
ReleaseAdditionalReference
();
...
...
src/brpc/server.cpp
View file @
62d0abd0
...
...
@@ -840,14 +840,18 @@ int Server::StartInternal(const butil::ip_t& ip,
// Free last SSL contexts
FreeSSLContexts
();
CertInfo
&
default_cert
=
_options
.
ssl_options
.
default_cert
;
if
(
!
default_cert
.
certificate
.
empty
())
{
if
(
_options
.
ssl_options
)
{
CertInfo
&
default_cert
=
_options
.
ssl_options
->
default_cert
;
if
(
default_cert
.
certificate
.
empty
())
{
LOG
(
ERROR
)
<<
"default_cert is empty"
;
return
-
1
;
}
if
(
AddCertificate
(
default_cert
)
!=
0
)
{
return
-
1
;
}
_default_ssl_ctx
=
_ssl_ctx_map
.
begin
()
->
second
.
ctx
;
const
std
::
vector
<
CertInfo
>&
certs
=
_options
.
ssl_options
.
certs
;
const
std
::
vector
<
CertInfo
>&
certs
=
_options
.
ssl_options
->
certs
;
for
(
size_t
i
=
0
;
i
<
certs
.
size
();
++
i
)
{
if
(
AddCertificate
(
certs
[
i
])
!=
0
)
{
return
-
1
;
...
...
@@ -1791,6 +1795,10 @@ Server::FindServicePropertyByName(const butil::StringPiece& name) const {
}
int
Server
::
AddCertificate
(
const
CertInfo
&
cert
)
{
if
(
_options
.
ssl_options
==
NULL
)
{
LOG
(
ERROR
)
<<
"ServerOptions.ssl_options is not configured yet"
;
return
-
1
;
}
std
::
string
cert_key
(
cert
.
certificate
);
cert_key
.
append
(
cert
.
private_key
);
if
(
_ssl_ctx_map
.
seek
(
cert_key
)
!=
NULL
)
{
...
...
@@ -1800,15 +1808,17 @@ int Server::AddCertificate(const CertInfo& cert) {
SSLContext
ssl_ctx
;
ssl_ctx
.
filters
=
cert
.
sni_filters
;
ssl_ctx
.
ctx
=
CreateServerSSLContext
(
cert
.
certificate
,
cert
.
private_key
,
_options
.
ssl_options
,
&
ssl_ctx
.
filters
);
if
(
ssl_ctx
.
ctx
==
NULL
)
{
ssl_ctx
.
ctx
=
std
::
make_shared
<
SocketSSLContext
>
();
SSL_CTX
*
raw_ctx
=
CreateServerSSLContext
(
cert
.
certificate
,
cert
.
private_key
,
*
_options
.
ssl_options
,
&
ssl_ctx
.
filters
);
if
(
raw_ctx
==
NULL
)
{
return
-
1
;
}
ssl_ctx
.
ctx
->
raw_ctx
=
raw_ctx
;
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback
(
ssl_ctx
.
ctx
,
SSLSwitchCTXByHostname
);
SSL_CTX_set_tlsext_servername_arg
(
ssl_ctx
.
ctx
,
this
);
SSL_CTX_set_tlsext_servername_callback
(
ssl_ctx
.
ctx
->
raw_ctx
,
SSLSwitchCTXByHostname
);
SSL_CTX_set_tlsext_servername_arg
(
ssl_ctx
.
ctx
->
raw_ctx
,
this
);
#endif
if
(
!
_reload_cert_maps
.
Modify
(
AddCertMapping
,
ssl_ctx
))
{
...
...
@@ -1850,6 +1860,10 @@ bool Server::AddCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
}
int
Server
::
RemoveCertificate
(
const
CertInfo
&
cert
)
{
if
(
_options
.
ssl_options
==
NULL
)
{
LOG
(
ERROR
)
<<
"ServerOptions.ssl_options is not configured yet"
;
return
-
1
;
}
std
::
string
cert_key
(
cert
.
certificate
);
cert_key
.
append
(
cert
.
private_key
);
SSLContext
*
ssl_ctx
=
_ssl_ctx_map
.
seek
(
cert_key
);
...
...
@@ -1868,8 +1882,6 @@ int Server::RemoveCertificate(const CertInfo& cert) {
return
-
1
;
}
// After a successful Modify, now it's safe to erase SSLContext
SSL_CTX_free
(
ssl_ctx
->
ctx
);
_ssl_ctx_map
.
erase
(
cert_key
);
return
0
;
}
...
...
@@ -1884,7 +1896,7 @@ bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
}
else
{
cmap
=
&
(
bg
.
cert_map
);
}
SSL_CTX
*
*
ctx
=
cmap
->
seek
(
hostname
);
std
::
shared_ptr
<
SocketSSLContext
>
*
ctx
=
cmap
->
seek
(
hostname
);
if
(
ctx
!=
NULL
&&
*
ctx
==
ssl_ctx
.
ctx
)
{
cmap
->
erase
(
hostname
);
}
...
...
@@ -1893,6 +1905,11 @@ bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
}
int
Server
::
ResetCertificates
(
const
std
::
vector
<
CertInfo
>&
certs
)
{
if
(
_options
.
ssl_options
==
NULL
)
{
LOG
(
ERROR
)
<<
"ServerOptions.ssl_options is not configured yet"
;
return
-
1
;
}
SSLContextMap
tmp_map
;
if
(
tmp_map
.
init
(
INITIAL_CERT_MAP
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to initialize tmp_map"
;
...
...
@@ -1901,8 +1918,8 @@ int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
// Add default certficiate into tmp_map first since it can't be reloaded
std
::
string
default_cert_key
=
_options
.
ssl_options
.
default_cert
.
certificate
+
_options
.
ssl_options
.
default_cert
.
private_key
;
_options
.
ssl_options
->
default_cert
.
certificate
+
_options
.
ssl_options
->
default_cert
.
private_key
;
tmp_map
[
default_cert_key
]
=
_ssl_ctx_map
[
default_cert_key
];
for
(
size_t
i
=
0
;
i
<
certs
.
size
();
++
i
)
{
...
...
@@ -1915,28 +1932,26 @@ int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
SSLContext
ssl_ctx
;
ssl_ctx
.
filters
=
certs
[
i
].
sni_filters
;
ssl_ctx
.
ctx
=
CreateServerSSLContext
(
ssl_ctx
.
ctx
=
std
::
make_shared
<
SocketSSLContext
>
();
ssl_ctx
.
ctx
->
raw_ctx
=
CreateServerSSLContext
(
certs
[
i
].
certificate
,
certs
[
i
].
private_key
,
_options
.
ssl_options
,
&
ssl_ctx
.
filters
);
if
(
ssl_ctx
.
ctx
==
NULL
)
{
FreeSSLContextMap
(
tmp_map
,
true
);
*
_options
.
ssl_options
,
&
ssl_ctx
.
filters
);
if
(
ssl_ctx
.
ctx
->
raw_ctx
==
NULL
)
{
return
-
1
;
}
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback
(
ssl_ctx
.
ctx
,
SSLSwitchCTXByHostname
);
SSL_CTX_set_tlsext_servername_arg
(
ssl_ctx
.
ctx
,
this
);
SSL_CTX_set_tlsext_servername_callback
(
ssl_ctx
.
ctx
->
raw_ctx
,
SSLSwitchCTXByHostname
);
SSL_CTX_set_tlsext_servername_arg
(
ssl_ctx
.
ctx
->
raw_ctx
,
this
);
#endif
tmp_map
[
cert_key
]
=
ssl_ctx
;
}
if
(
!
_reload_cert_maps
.
Modify
(
ResetCertMappings
,
tmp_map
))
{
FreeSSLContextMap
(
tmp_map
,
true
);
return
-
1
;
}
_ssl_ctx_map
.
swap
(
tmp_map
);
FreeSSLContextMap
(
tmp_map
,
true
);
return
0
;
}
...
...
@@ -1976,19 +1991,8 @@ bool Server::ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map) {
return
true
;
}
void
Server
::
FreeSSLContextMap
(
SSLContextMap
&
ctx_map
,
bool
keep_default
)
{
for
(
SSLContextMap
::
iterator
it
=
ctx_map
.
begin
();
it
!=
ctx_map
.
end
();
++
it
)
{
if
(
keep_default
&&
it
->
second
.
ctx
==
_default_ssl_ctx
)
{
continue
;
}
SSL_CTX_free
(
it
->
second
.
ctx
);
}
ctx_map
.
clear
();
}
void
Server
::
FreeSSLContexts
()
{
FreeSSLContextMap
(
_ssl_ctx_map
,
false
);
_ssl_ctx_map
.
clear
(
);
_reload_cert_maps
.
Modify
(
ClearCertMapping
);
_default_ssl_ctx
=
NULL
;
}
...
...
@@ -2082,7 +2086,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
int
*
al
,
Server
*
server
)
{
(
void
)
al
;
const
char
*
hostname
=
SSL_get_servername
(
ssl
,
TLSEXT_NAMETYPE_host_name
);
bool
strict_sni
=
server
->
_options
.
ssl_options
.
strict_sni
;
bool
strict_sni
=
server
->
_options
.
ssl_options
->
strict_sni
;
if
(
hostname
==
NULL
)
{
return
strict_sni
?
SSL_TLSEXT_ERR_ALERT_FATAL
:
SSL_TLSEXT_ERR_NOACK
;
}
...
...
@@ -2092,7 +2096,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
return
SSL_TLSEXT_ERR_ALERT_FATAL
;
}
SSL_CTX
*
*
pctx
=
s
->
cert_map
.
seek
(
hostname
);
std
::
shared_ptr
<
SocketSSLContext
>
*
pctx
=
s
->
cert_map
.
seek
(
hostname
);
if
(
pctx
==
NULL
)
{
const
char
*
dot
=
hostname
;
for
(;
*
dot
!=
'\0'
;
++
dot
)
{
...
...
@@ -2114,7 +2118,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
}
// Switch SSL_CTX to the one with correct hostname
SSL_set_SSL_CTX
(
ssl
,
*
p
ctx
);
SSL_set_SSL_CTX
(
ssl
,
(
*
pctx
)
->
raw_
ctx
);
return
SSL_TLSEXT_ERR_OK
;
}
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
...
...
src/brpc/server.h
View file @
62d0abd0
...
...
@@ -38,10 +38,6 @@
#include "brpc/health_reporter.h"
#include "brpc/adaptive_max_concurrency.h"
extern
"C"
{
struct
ssl_ctx_st
;
}
namespace
brpc
{
class
Acceptor
;
...
...
@@ -52,6 +48,7 @@ class SimpleDataPool;
class
MongoServiceAdaptor
;
class
RestfulMap
;
class
RtmpService
;
class
SocketSSLContext
;
struct
ServerOptions
{
// Constructed with default options.
...
...
@@ -202,7 +199,7 @@ struct ServerOptions {
bool
security_mode
()
const
{
return
internal_port
>=
0
||
!
has_builtin_services
;
}
// SSL related options. Refer to `ServerSSLOptions' for details
ServerSSLOptions
ssl_options
;
std
::
shared_ptr
<
ServerSSLOptions
>
ssl_options
;
// [CAUTION] This option is for implementing specialized http proxies,
// most users don't need it. Don't change this option unless you fully
...
...
@@ -573,21 +570,20 @@ friend class Controller;
std
::
string
ServerPrefix
()
const
;
// Mapping from hostname to corresponding SSL_CTX
typedef
butil
::
CaseIgnoredFlatMap
<
st
ruct
ssl_ctx_st
*
>
CertMap
;
typedef
butil
::
CaseIgnoredFlatMap
<
st
d
::
shared_ptr
<
SocketSSLContext
>
>
CertMap
;
struct
CertMaps
{
CertMap
cert_map
;
CertMap
wildcard_cert_map
;
};
struct
SSLContext
{
st
ruct
ssl_ctx_st
*
ctx
;
st
d
::
shared_ptr
<
SocketSSLContext
>
ctx
;
std
::
vector
<
std
::
string
>
filters
;
};
// Mapping from [certficate + private-key] to SSLContext
typedef
butil
::
FlatMap
<
std
::
string
,
SSLContext
>
SSLContextMap
;
void
FreeSSLContexts
();
void
FreeSSLContextMap
(
SSLContextMap
&
ctx_map
,
bool
keep_default
);
static
int
SSLSwitchCTXByHostname
(
struct
ssl_st
*
ssl
,
int
*
al
,
Server
*
server
);
...
...
@@ -636,7 +632,7 @@ friend class Controller;
RestfulMap
*
_global_restful_map
;
// Default certficate which can't be reloaded
st
ruct
ssl_ctx_st
*
_default_ssl_ctx
;
st
d
::
shared_ptr
<
SocketSSLContext
>
_default_ssl_ctx
;
// Reloadable SSL mappings
butil
::
DoublyBufferedData
<
CertMaps
>
_reload_cert_maps
;
...
...
src/brpc/socket.cpp
View file @
62d0abd0
This diff is collapsed.
Click to expand it.
src/brpc/socket.h
View file @
62d0abd0
...
...
@@ -38,7 +38,6 @@
#include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr
namespace
brpc
{
namespace
policy
{
class
ConsistentHashingLoadBalancer
;
...
...
@@ -137,6 +136,14 @@ struct PipelinedInfo {
bthread_id_t
id_wait
;
};
struct
SocketSSLContext
{
SocketSSLContext
();
~
SocketSSLContext
();
SSL_CTX
*
raw_ctx
;
// owned
std
::
string
sni_name
;
// useful for clients
};
// TODO: Comment fields
struct
SocketOptions
{
SocketOptions
();
...
...
@@ -155,9 +162,7 @@ struct SocketOptions {
// one thread at any time.
void
(
*
on_edge_triggered_events
)(
Socket
*
);
int
health_check_interval_s
;
bool
owns_ssl_ctx
;
SSL_CTX
*
ssl_ctx
;
std
::
string
sni_name
;
std
::
shared_ptr
<
SocketSSLContext
>
initial_ssl_ctx
;
bthread_keytable_pool_t
*
keytable_pool
;
SocketConnection
*
conn
;
AppConnect
*
app_connect
;
...
...
src/brpc/socket_inl.h
View file @
62d0abd0
...
...
@@ -54,8 +54,6 @@ inline SocketOptions::SocketOptions()
,
user
(
NULL
)
,
on_edge_triggered_events
(
NULL
)
,
health_check_interval_s
(
-
1
)
,
owns_ssl_ctx
(
false
)
,
ssl_ctx
(
NULL
)
,
keytable_pool
(
NULL
)
,
conn
(
NULL
)
,
app_connect
(
NULL
)
...
...
src/brpc/socket_map.cpp
View file @
62d0abd0
...
...
@@ -21,14 +21,11 @@
#include "butil/time.h"
#include "butil/scoped_lock.h"
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "brpc/log.h"
#include "brpc/protocol.h"
#include "brpc/input_messenger.h"
#include "brpc/reloadable_flags.h"
#include "brpc/socket_map.h"
#include "brpc/details/ssl_helper.h" // CreateClientSSLContext
namespace
brpc
{
...
...
@@ -88,62 +85,9 @@ SocketMap* get_or_new_client_side_socket_map() {
return
g_socket_map
.
load
(
butil
::
memory_order_consume
);
}
void
ComputeSocketMapKeyChecksum
(
const
SocketMapKey
&
key
,
unsigned
char
*
checksum
)
{
butil
::
MurmurHash3_x64_128_Context
mm_ctx
;
butil
::
MurmurHash3_x64_128_Init
(
&
mm_ctx
,
0
);
const
int
BUFSIZE
=
1024
;
// Should be enough
char
buf
[
BUFSIZE
];
int
cur_len
=
0
;
#define SAFE_MEMCOPY(dst, cur_len, src, size) \
do { \
int copy_len = std::min((int)size, BUFSIZE - cur_len); \
if (copy_len > 0) { \
memcpy(dst + cur_len, src, copy_len); \
cur_len += copy_len; \
} \
} while (0);
std
::
size_t
ephash
=
butil
::
DefaultHasher
<
butil
::
EndPoint
>
()(
key
.
peer
);
SAFE_MEMCOPY
(
buf
,
cur_len
,
&
ephash
,
sizeof
(
ephash
));
SAFE_MEMCOPY
(
buf
,
cur_len
,
&
key
.
auth
,
sizeof
(
key
.
auth
));
const
ChannelSSLOptions
&
ssl
=
key
.
ssl_options
;
SAFE_MEMCOPY
(
buf
,
cur_len
,
&
ssl
.
enable
,
sizeof
(
ssl
.
enable
));
if
(
ssl
.
enable
)
{
SAFE_MEMCOPY
(
buf
,
cur_len
,
ssl
.
ciphers
.
data
(),
ssl
.
ciphers
.
size
());
SAFE_MEMCOPY
(
buf
,
cur_len
,
ssl
.
protocols
.
data
(),
ssl
.
protocols
.
size
());
SAFE_MEMCOPY
(
buf
,
cur_len
,
ssl
.
sni_name
.
data
(),
ssl
.
sni_name
.
size
());
const
VerifyOptions
&
verify
=
ssl
.
verify
;
SAFE_MEMCOPY
(
buf
,
cur_len
,
&
verify
.
verify_depth
,
sizeof
(
verify
.
verify_depth
));
if
(
verify
.
verify_depth
>
0
)
{
SAFE_MEMCOPY
(
buf
,
cur_len
,
verify
.
ca_file_path
.
data
(),
verify
.
ca_file_path
.
size
());
}
}
else
{
// All disabled ChannelSSLOptions are the same
}
#undef SAFE_MEMCOPY
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
buf
,
cur_len
);
const
CertInfo
&
cert
=
ssl
.
client_cert
;
if
(
ssl
.
enable
&&
!
cert
.
certificate
.
empty
())
{
// Certificate may be too long (PEM string) to fit into `buf'
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
cert
.
certificate
.
data
(),
cert
.
certificate
.
size
());
butil
::
MurmurHash3_x64_128_Update
(
&
mm_ctx
,
cert
.
private_key
.
data
(),
cert
.
private_key
.
size
());
// sni_filters has no effect in ChannelSSLOptions
}
butil
::
MurmurHash3_x64_128_Final
(
checksum
,
&
mm_ctx
);
}
int
SocketMapInsert
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
return
get_or_new_client_side_socket_map
()
->
Insert
(
key
,
id
);
int
SocketMapInsert
(
const
SocketMapKey
&
key
,
SocketId
*
id
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
)
{
return
get_or_new_client_side_socket_map
()
->
Insert
(
key
,
id
,
ssl_ctx
);
}
int
SocketMapFind
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
...
...
@@ -264,10 +208,10 @@ void SocketMap::PrintSocketMap(std::ostream& os, void* arg) {
static_cast
<
SocketMap
*>
(
arg
)
->
Print
(
os
);
}
int
SocketMap
::
Insert
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
SocketMapKeyChecksum
ck
(
key
);
int
SocketMap
::
Insert
(
const
SocketMapKey
&
key
,
SocketId
*
id
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
)
{
std
::
unique_lock
<
butil
::
Mutex
>
mu
(
_mutex
);
SingleConnection
*
sc
=
_map
.
seek
(
ck
);
SingleConnection
*
sc
=
_map
.
seek
(
key
);
if
(
sc
)
{
if
(
!
sc
->
socket
->
Failed
()
||
sc
->
socket
->
health_check_interval
()
>
0
/*HC enabled*/
)
{
...
...
@@ -277,30 +221,20 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id) {
}
// A socket w/o HC is failed (permanently), replace it.
SocketUniquePtr
ptr
(
sc
->
socket
);
// Remove the ref added at insertion.
_map
.
erase
(
ck
);
// in principle, we can override the entry in map w/o
_map
.
erase
(
key
);
// in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
// error-prone. We prefer code maintainability here.
sc
=
NULL
;
}
std
::
unique_ptr
<
SSL_CTX
,
FreeSSLCTX
>
ssl_ctx
(
CreateClientSSLContext
(
key
.
ssl_options
));
if
(
key
.
ssl_options
.
enable
&&
!
ssl_ctx
)
{
return
-
1
;
}
SocketId
tmp_id
;
SocketOptions
opt
;
opt
.
remote_side
=
key
.
peer
;
// Can't save SSL_CTX in SocketMap since SingleConnection's desctruction
// may happen before Socket's destruction (remove Channel before RPC complete)
opt
.
owns_ssl_ctx
=
true
;
opt
.
ssl_ctx
=
ssl_ctx
.
get
();
opt
.
sni_name
=
key
.
ssl_options
.
sni_name
;
opt
.
initial_ssl_ctx
=
ssl_ctx
;
if
(
_options
.
socket_creator
->
CreateSocket
(
opt
,
&
tmp_id
)
!=
0
)
{
PLOG
(
FATAL
)
<<
"Fail to create socket to "
<<
key
.
peer
;
return
-
1
;
}
ssl_ctx
.
release
();
// Add a reference to make sure that sc->socket is always accessible. Not
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
...
...
@@ -310,7 +244,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id) {
return
-
1
;
}
SingleConnection
new_sc
=
{
1
,
ptr
.
release
(),
0
};
_map
[
ck
]
=
new_sc
;
_map
[
key
]
=
new_sc
;
*
id
=
tmp_id
;
bool
need_to_create_bvar
=
false
;
if
(
FLAGS_show_socketmap_in_vars
&&
!
_exposed_in_bvar
)
{
...
...
@@ -334,9 +268,8 @@ void SocketMap::Remove(const SocketMapKey& key, SocketId expected_id) {
void
SocketMap
::
RemoveInternal
(
const
SocketMapKey
&
key
,
SocketId
expected_id
,
bool
remove_orphan
)
{
SocketMapKeyChecksum
ck
(
key
);
std
::
unique_lock
<
butil
::
Mutex
>
mu
(
_mutex
);
SingleConnection
*
sc
=
_map
.
seek
(
ck
);
SingleConnection
*
sc
=
_map
.
seek
(
key
);
if
(
!
sc
)
{
return
;
}
...
...
@@ -354,7 +287,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
sc
->
no_ref_us
=
butil
::
cpuwide_time_us
();
}
else
{
Socket
*
const
s
=
sc
->
socket
;
_map
.
erase
(
ck
);
_map
.
erase
(
key
);
bool
need_to_create_bvar
=
false
;
if
(
FLAGS_show_socketmap_in_vars
&&
!
_exposed_in_bvar
)
{
_exposed_in_bvar
=
true
;
...
...
@@ -374,9 +307,8 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
}
int
SocketMap
::
Find
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
SocketMapKeyChecksum
ck
(
key
);
BAIDU_SCOPED_LOCK
(
_mutex
);
SingleConnection
*
sc
=
_map
.
seek
(
ck
);
SingleConnection
*
sc
=
_map
.
seek
(
key
);
if
(
sc
)
{
*
id
=
sc
->
socket
->
id
();
return
0
;
...
...
@@ -400,14 +332,14 @@ void SocketMap::List(std::vector<butil::EndPoint>* pts) {
}
}
void
SocketMap
::
ListOrphans
(
int64_t
defer_us
,
std
::
vector
<
butil
::
EndPoint
>*
out
)
{
void
SocketMap
::
ListOrphans
(
int64_t
defer_us
,
std
::
vector
<
SocketMapKey
>*
out
)
{
out
->
clear
();
const
int64_t
now
=
butil
::
cpuwide_time_us
();
BAIDU_SCOPED_LOCK
(
_mutex
);
for
(
Map
::
iterator
it
=
_map
.
begin
();
it
!=
_map
.
end
();
++
it
)
{
SingleConnection
&
sc
=
it
->
second
;
if
(
sc
.
ref_count
==
0
&&
now
-
sc
.
no_ref_us
>=
defer_us
)
{
out
->
push_back
(
it
->
first
.
peer
);
out
->
push_back
(
it
->
first
);
}
}
}
...
...
@@ -420,7 +352,7 @@ void* SocketMap::RunWatchConnections(void* arg) {
void
SocketMap
::
WatchConnections
()
{
std
::
vector
<
SocketId
>
main_sockets
;
std
::
vector
<
SocketId
>
pooled_sockets
;
std
::
vector
<
butil
::
EndPoint
>
orphan_sockets
;
std
::
vector
<
SocketMapKey
>
orphan_sockets
;
const
uint64_t
CHECK_INTERVAL_US
=
1000000UL
;
while
(
bthread_usleep
(
CHECK_INTERVAL_US
)
==
0
)
{
// NOTE: save the gflag which may be reloaded at any time.
...
...
src/brpc/socket_map.h
View file @
62d0abd0
...
...
@@ -22,7 +22,6 @@
#include "butil/containers/flat_map.h" // FlatMap
#include "brpc/socket_id.h" // SockdetId
#include "brpc/options.pb.h" // ProtocolType
#include "brpc/ssl_option.h" // ChannelSSLOptions
#include "brpc/input_messenger.h" // InputMessageHandler
...
...
@@ -30,29 +29,57 @@ namespace brpc {
// Global mapping from remote-side to out-going sockets created by Channels.
struct
ChannelSignature
{
uint64_t
data
[
2
];
ChannelSignature
()
{
Reset
();
}
void
Reset
()
{
data
[
0
]
=
data
[
1
]
=
0
;
}
};
inline
bool
operator
==
(
const
ChannelSignature
&
s1
,
const
ChannelSignature
&
s2
)
{
return
s1
.
data
[
0
]
==
s2
.
data
[
0
]
&&
s1
.
data
[
1
]
==
s2
.
data
[
1
];
}
inline
bool
operator
!=
(
const
ChannelSignature
&
s1
,
const
ChannelSignature
&
s2
)
{
return
!
(
s1
==
s2
);
}
// The following fields uniquely define a Socket. In other word,
// Socket can't be shared between 2 different SocketMapKeys
struct
SocketMapKey
{
SocketMapKey
(
const
butil
::
EndPoint
&
pt
,
ChannelSSLOptions
ssl
=
ChannelSSLOptions
(),
const
Authenticator
*
auth2
=
NULL
)
:
peer
(
pt
),
ssl_options
(
ssl
),
auth
(
auth2
)
explicit
SocketMapKey
(
const
butil
::
EndPoint
&
pt
)
:
peer
(
pt
)
{}
SocketMapKey
(
const
butil
::
EndPoint
&
pt
,
const
ChannelSignature
&
cs
)
:
peer
(
pt
),
channel_signature
(
cs
)
{}
butil
::
EndPoint
peer
;
ChannelSSLOptions
ssl_options
;
const
Authenticator
*
auth
;
ChannelSignature
channel_signature
;
};
inline
bool
operator
==
(
const
SocketMapKey
&
k1
,
const
SocketMapKey
&
k2
)
{
return
k1
.
peer
==
k2
.
peer
&&
k1
.
channel_signature
==
k2
.
channel_signature
;
};
struct
SocketMapKeyHasher
{
std
::
size_t
operator
()(
const
SocketMapKey
&
key
)
const
{
return
butil
::
DefaultHasher
<
butil
::
EndPoint
>
()(
key
.
peer
)
^
key
.
channel_signature
.
data
[
1
];
}
};
// Calculate an 128-bit hashcode for SocketMapKey
void
ComputeSocketMapKeyChecksum
(
const
SocketMapKey
&
key
,
unsigned
char
*
checksum
);
// Try to share the Socket to `key'. If the Socket does not exist, create one.
// The corresponding SocketId is written to `*id'. If this function returns
// successfully, SocketMapRemove() MUST be called when the Socket is not needed.
// Return 0 on success, -1 otherwise.
int
SocketMapInsert
(
const
SocketMapKey
&
key
,
SocketId
*
id
);
int
SocketMapInsert
(
const
SocketMapKey
&
key
,
SocketId
*
id
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
);
inline
int
SocketMapInsert
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
std
::
shared_ptr
<
SocketSSLContext
>
empty_ptr
;
return
SocketMapInsert
(
key
,
id
,
empty_ptr
);
}
// Find the SocketId associated with `key'.
// Return 0 on found, -1 otherwise.
...
...
@@ -110,7 +137,13 @@ public:
SocketMap
();
~
SocketMap
();
int
Init
(
const
SocketMapOptions
&
);
int
Insert
(
const
SocketMapKey
&
key
,
SocketId
*
id
);
int
Insert
(
const
SocketMapKey
&
key
,
SocketId
*
id
,
const
std
::
shared_ptr
<
SocketSSLContext
>&
ssl_ctx
);
int
Insert
(
const
SocketMapKey
&
key
,
SocketId
*
id
)
{
std
::
shared_ptr
<
SocketSSLContext
>
empty_ptr
;
return
Insert
(
key
,
id
,
empty_ptr
);
}
void
Remove
(
const
SocketMapKey
&
key
,
SocketId
expected_id
);
int
Find
(
const
SocketMapKey
&
key
,
SocketId
*
id
);
void
List
(
std
::
vector
<
SocketId
>*
ids
);
...
...
@@ -120,7 +153,7 @@ public:
private
:
void
RemoveInternal
(
const
SocketMapKey
&
key
,
SocketId
id
,
bool
remove_orphan
);
void
ListOrphans
(
int64_t
defer_us
,
std
::
vector
<
butil
::
EndPoint
>*
out
);
void
ListOrphans
(
int64_t
defer_us
,
std
::
vector
<
SocketMapKey
>*
out
);
void
WatchConnections
();
static
void
*
RunWatchConnections
(
void
*
);
void
Print
(
std
::
ostream
&
os
);
...
...
@@ -133,39 +166,10 @@ private:
int64_t
no_ref_us
;
};
// Store checksum of SocketMapKey instead of itself in order to:
// 1. Save precious space of key field in FlatMap
// 2. Simplify equivalence logic between SocketMapKeys
// (regard the hash collision to be zero)
struct
SocketMapKeyChecksum
{
explicit
SocketMapKeyChecksum
(
const
SocketMapKey
&
key
)
:
peer
(
key
.
peer
)
{
ComputeSocketMapKeyChecksum
(
key
,
checksum
);
}
butil
::
EndPoint
peer
;
unsigned
char
checksum
[
16
];
inline
bool
operator
==
(
const
SocketMapKeyChecksum
&
rhs
)
const
{
return
this
->
peer
==
rhs
.
peer
&&
memcmp
(
this
->
checksum
,
rhs
.
checksum
,
sizeof
(
checksum
))
==
0
;
}
};
struct
Checksum2Hash
{
std
::
size_t
operator
()(
const
SocketMapKeyChecksum
&
key
)
const
{
// Slice a subset of checksum over an evenly distributed hash
// won't affect the overall balance
std
::
size_t
hash
;
memcpy
(
&
hash
,
key
.
checksum
,
sizeof
(
hash
));
return
hash
;
}
};
// TODO: When RpcChannels connecting to one EndPoint are frequently created
// and destroyed, a single map+mutex may become hot-spots.
typedef
butil
::
FlatMap
<
SocketMapKey
Checksum
,
S
ingleConnection
,
Checksum2Hash
>
Map
;
typedef
butil
::
FlatMap
<
SocketMapKey
,
SingleConnection
,
S
ocketMapKeyHasher
>
Map
;
SocketMapOptions
_options
;
butil
::
Mutex
_mutex
;
Map
_map
;
...
...
src/brpc/ssl_option.cpp
View file @
62d0abd0
...
...
@@ -21,8 +21,7 @@ namespace brpc {
VerifyOptions
::
VerifyOptions
()
:
verify_depth
(
0
)
{}
ChannelSSLOptions
::
ChannelSSLOptions
()
:
enable
(
false
)
,
ciphers
(
"DEFAULT"
)
:
ciphers
(
"DEFAULT"
)
,
protocols
(
"TLSv1, TLSv1.1, TLSv1.2"
)
{}
...
...
src/brpc/ssl_option.h
View file @
62d0abd0
...
...
@@ -59,10 +59,6 @@ struct ChannelSSLOptions {
// Constructed with default options
ChannelSSLOptions
();
// Whether to enable SSL on the channel.
// Default: false
bool
enable
;
// Cipher suites used for SSL handshake.
// The format of this string should follow that in `man 1 cipers'.
// Default: "DEFAULT"
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment