Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
d729250a
Commit
d729250a
authored
Aug 02, 2017
by
gejun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
patch svn r34956
Change-Id: I6cde4f8c352144a3376cba0f92792462c755bda3
parent
efb67135
Hide whitespace changes
Inline
Side-by-side
Showing
25 changed files
with
61 additions
and
111 deletions
+61
-111
acceptor.cpp
brpc/acceptor.cpp
+7
-12
acceptor.h
brpc/acceptor.h
+3
-3
excluded_servers.h
brpc/details/excluded_servers.h
+3
-7
ssl_helper.cpp
brpc/details/ssl_helper.cpp
+4
-10
extension.h
brpc/extension.h
+1
-1
extension_inl.h
brpc/extension_inl.h
+0
-2
input_messenger.cpp
brpc/input_messenger.cpp
+0
-2
input_messenger.h
brpc/input_messenger.h
+1
-1
load_balancer.cpp
brpc/load_balancer.cpp
+2
-4
load_balancer.h
brpc/load_balancer.h
+1
-1
naming_service_thread.cpp
brpc/naming_service_thread.cpp
+0
-2
naming_service_thread.h
brpc/naming_service_thread.h
+1
-1
locality_aware_load_balancer.cpp
brpc/policy/locality_aware_load_balancer.cpp
+4
-8
locality_aware_load_balancer.h
brpc/policy/locality_aware_load_balancer.h
+1
-1
rtmp_protocol.cpp
brpc/policy/rtmp_protocol.cpp
+3
-7
rtmp_protocol.h
brpc/policy/rtmp_protocol.h
+2
-2
progressive_attachment.cpp
brpc/progressive_attachment.cpp
+3
-5
progressive_attachment.h
brpc/progressive_attachment.h
+1
-1
rtmp.cpp
brpc/rtmp.cpp
+10
-16
rtmp.h
brpc/rtmp.h
+3
-3
selective_channel.cpp
brpc/selective_channel.cpp
+2
-5
simple_data_pool.h
brpc/simple_data_pool.h
+4
-7
socket_map.cpp
brpc/socket_map.cpp
+3
-7
socket_map.h
brpc/socket_map.h
+1
-1
parallel_http.cpp
tools/parallel_http/parallel_http.cpp
+1
-2
No files found.
brpc/acceptor.cpp
View file @
d729250a
...
...
@@ -23,17 +23,13 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
,
_close_idle_tid
(
INVALID_BTHREAD
)
,
_listened_fd
(
-
1
)
,
_acception_id
(
0
)
,
_empty_cond
(
&
_map_mutex
)
,
_ssl_ctx
(
NULL
)
{
CHECK_EQ
(
0
,
pthread_mutex_init
(
&
_map_mutex
,
NULL
));
CHECK_EQ
(
0
,
pthread_cond_init
(
&
_empty_cond
,
NULL
));
}
Acceptor
::~
Acceptor
()
{
StopAccept
(
0
);
Join
();
CHECK_EQ
(
0
,
pthread_mutex_destroy
(
&
_map_mutex
));
CHECK_EQ
(
0
,
pthread_cond_destroy
(
&
_empty_cond
));
}
int
Acceptor
::
StartAccept
(
...
...
@@ -151,19 +147,18 @@ int Acceptor::Initialize() {
// NOTE: Join() can happen before StopAccept()
void
Acceptor
::
Join
()
{
pthread_mutex_lock
(
&
_map_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_map_mutex
);
if
(
_status
!=
STOPPING
&&
_status
!=
RUNNING
)
{
// no need to join.
pthread_mutex_unlock
(
&
_map_mutex
);
return
;
}
// `_listened_fd' will be set to -1 once it has been recycled
while
(
_listened_fd
>
0
||
!
_socket_map
.
empty
())
{
pthread_cond_wait
(
&
_empty_cond
,
&
_map_mutex
);
_empty_cond
.
Wait
(
);
}
const
int
saved_idle_timeout_sec
=
_idle_timeout_sec
;
_idle_timeout_sec
=
0
;
const
bthread_t
saved_close_idle_tid
=
_close_idle_tid
;
pthread_mutex_unlock
(
&
_map_mutex
);
mu
.
unlock
(
);
// Join the bthread outside lock.
if
(
saved_idle_timeout_sec
>
0
)
{
...
...
@@ -194,7 +189,7 @@ void Acceptor::ListConnections(std::vector<SocketId>* conn_list,
// ConnectionCount is inaccurate, enough space is reserved
conn_list
->
reserve
(
ConnectionCount
()
+
10
);
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_map_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_map_mutex
);
if
(
!
_socket_map
.
initialized
())
{
// Optional. Uninitialized FlatMap should be iteratable.
return
;
...
...
@@ -313,14 +308,14 @@ void Acceptor::BeforeRecycle(Socket* sock) {
// so that we are ensured no more events will arrive (and `Join'
// will return to its caller)
_listened_fd
=
-
1
;
pthread_cond_broadcast
(
&
_empty_cond
);
_empty_cond
.
Broadcast
(
);
return
;
}
// If a Socket could not be addressed shortly after its creation, it
// was not added into `_socket_map'.
_socket_map
.
erase
(
sock
->
id
());
if
(
_socket_map
.
empty
())
{
pthread_cond_broadcast
(
&
_empty_cond
);
_empty_cond
.
Broadcast
(
);
}
}
...
...
brpc/acceptor.h
View file @
d729250a
...
...
@@ -7,7 +7,7 @@
#ifndef BRPC_ACCEPTOR_H
#define BRPC_ACCEPTOR_H
#include
<pthread.h>
#include
"base/synchronization/condition_variable.h"
#include "base/containers/flat_map.h"
#include "brpc/input_messenger.h"
...
...
@@ -86,8 +86,8 @@ private:
// The Socket tso accept connections.
SocketId
_acception_id
;
pthread_mutex_t
_map_mutex
;
pthread_cond_t
_empty_cond
;
base
::
Mutex
_map_mutex
;
base
::
ConditionVariable
_empty_cond
;
// The map containing all the accepted sockets
SocketMap
_socket_map
;
...
...
brpc/details/excluded_servers.h
View file @
d729250a
...
...
@@ -38,16 +38,12 @@ public:
private
:
ExcludedServers
(
int
cap
)
:
_l
(
_space
,
sizeof
(
SocketId
)
*
cap
,
base
::
NOT_OWN_STORAGE
)
{
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
~
ExcludedServers
()
{
pthread_mutex_destroy
(
&
_mutex
);
}
:
_l
(
_space
,
sizeof
(
SocketId
)
*
cap
,
base
::
NOT_OWN_STORAGE
)
{}
~
ExcludedServers
()
{}
// Controller::_accessed may be shared by sub channels in schan, protect
// all mutable methods with this mutex. In ordinary channels, this mutex
// is never contended.
mutable
pthread_mutex_t
_mutex
;
mutable
base
::
Mutex
_mutex
;
base
::
BoundedQueue
<
SocketId
>
_l
;
SocketId
_space
[
0
];
};
...
...
brpc/details/ssl_helper.cpp
View file @
d729250a
...
...
@@ -22,7 +22,7 @@ namespace brpc {
// may crash probably due to some TLS data used inside OpenSSL
// Also according to performance test, there is little difference
// between pthread mutex and bthread mutex
static
pthread_mutex_t
*
g_ssl_mutexs
=
NULL
;
static
base
::
Mutex
*
g_ssl_mutexs
=
NULL
;
#ifndef OPENSSL_NO_DH
static
DH
*
g_dh_1024
=
NULL
;
...
...
@@ -480,20 +480,14 @@ static void SSLLockCallback(int mode, int n, const char* file, int line) {
// << (mode & CRYPTO_LOCK ? "locks" : "unlocks")
// << " thread=" << CRYPTO_thread_id();
if
(
mode
&
CRYPTO_LOCK
)
{
pthread_mutex_lock
(
&
g_ssl_mutexs
[
n
]
);
g_ssl_mutexs
[
n
].
lock
(
);
}
else
{
pthread_mutex_unlock
(
&
g_ssl_mutexs
[
n
]
);
g_ssl_mutexs
[
n
].
unlock
(
);
}
}
int
SSLThreadInit
()
{
g_ssl_mutexs
=
new
pthread_mutex_t
[
CRYPTO_num_locks
()];
for
(
int
i
=
0
;
i
<
CRYPTO_num_locks
();
++
i
)
{
if
(
pthread_mutex_init
(
&
g_ssl_mutexs
[
i
],
NULL
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to initialize pthread_mutex_t"
;
return
-
1
;
}
}
g_ssl_mutexs
=
new
base
::
Mutex
[
CRYPTO_num_locks
()];
CRYPTO_set_locking_callback
(
SSLLockCallback
);
#ifdef CRYPTO_LOCK_ECDH
CRYPTO_THREADID_set_callback
(
SSLGetThreadId
);
...
...
brpc/extension.h
View file @
d729250a
...
...
@@ -41,7 +41,7 @@ friend class base::GetLeakySingleton<Extension<T> >;
Extension
();
~
Extension
();
base
::
CaseIgnoredFlatMap
<
T
*>
_instance_map
;
pthread_mutex_t
_map_mutex
;
base
::
Mutex
_map_mutex
;
};
}
// namespace brpc
...
...
brpc/extension_inl.h
View file @
d729250a
...
...
@@ -19,13 +19,11 @@ Extension<T>* Extension<T>::instance() {
template
<
typename
T
>
Extension
<
T
>::
Extension
()
{
pthread_mutex_init
(
&
_map_mutex
,
NULL
);
_instance_map
.
init
(
29
);
}
template
<
typename
T
>
Extension
<
T
>::~
Extension
()
{
pthread_mutex_destroy
(
&
_map_mutex
);
}
template
<
typename
T
>
...
...
brpc/input_messenger.cpp
View file @
d729250a
...
...
@@ -328,7 +328,6 @@ InputMessenger::InputMessenger(size_t capacity)
,
_max_index
(
-
1
)
,
_non_protocol
(
false
)
,
_capacity
(
capacity
)
{
CHECK_EQ
(
0
,
pthread_mutex_init
(
&
_add_handler_mutex
,
NULL
));
}
InputMessenger
::~
InputMessenger
()
{
...
...
@@ -336,7 +335,6 @@ InputMessenger::~InputMessenger() {
_handlers
=
NULL
;
_max_index
.
store
(
-
1
,
base
::
memory_order_relaxed
);
_capacity
=
0
;
pthread_mutex_destroy
(
&
_add_handler_mutex
);
}
int
InputMessenger
::
AddHandler
(
const
InputMessageHandler
&
handler
)
{
...
...
brpc/input_messenger.h
View file @
d729250a
...
...
@@ -107,7 +107,7 @@ private:
bool
_non_protocol
;
size_t
_capacity
;
pthread_mutex_t
_add_handler_mutex
;
base
::
Mutex
_add_handler_mutex
;
};
// Get the global InputMessenger at client-side.
...
...
brpc/load_balancer.cpp
View file @
d729250a
...
...
@@ -23,12 +23,12 @@ void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
void
SharedLoadBalancer
::
ExposeLB
()
{
bool
changed
=
false
;
pthread_mutex_lock
(
&
_st_mutex
);
_st_mutex
.
lock
(
);
if
(
!
_exposed
)
{
_exposed
=
true
;
changed
=
true
;
}
pthread_mutex_unlock
(
&
_st_mutex
);
_st_mutex
.
unlock
(
);
if
(
changed
)
{
char
name
[
32
];
snprintf
(
name
,
sizeof
(
name
),
"_load_balancer_%d"
,
g_lb_counter
.
fetch_add
(
...
...
@@ -42,7 +42,6 @@ SharedLoadBalancer::SharedLoadBalancer()
,
_weight_sum
(
0
)
,
_exposed
(
false
)
,
_st
(
DescribeLB
,
this
)
{
pthread_mutex_init
(
&
_st_mutex
,
NULL
);
}
SharedLoadBalancer
::~
SharedLoadBalancer
()
{
...
...
@@ -51,7 +50,6 @@ SharedLoadBalancer::~SharedLoadBalancer() {
_lb
->
Destroy
();
_lb
=
NULL
;
}
pthread_mutex_destroy
(
&
_st_mutex
);
}
int
SharedLoadBalancer
::
Init
(
const
char
*
lb_name
)
{
...
...
brpc/load_balancer.h
View file @
d729250a
...
...
@@ -148,7 +148,7 @@ private:
LoadBalancer
*
_lb
;
base
::
atomic
<
int
>
_weight_sum
;
volatile
bool
_exposed
;
pthread_mutex_t
_st_mutex
;
base
::
Mutex
_st_mutex
;
bvar
::
PassiveStatus
<
std
::
string
>
_st
;
};
...
...
brpc/naming_service_thread.cpp
View file @
d729250a
...
...
@@ -193,7 +193,6 @@ NamingServiceThread::NamingServiceThread()
,
_source_ns
(
NULL
)
,
_ns
(
NULL
)
,
_actions
(
this
)
{
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
NamingServiceThread
::~
NamingServiceThread
()
{
...
...
@@ -232,7 +231,6 @@ NamingServiceThread::~NamingServiceThread() {
_ns
->
Destroy
();
_ns
=
NULL
;
}
pthread_mutex_destroy
(
&
_mutex
);
}
void
*
NamingServiceThread
::
RunThis
(
void
*
arg
)
{
...
...
brpc/naming_service_thread.h
View file @
d729250a
...
...
@@ -94,7 +94,7 @@ private:
const
std
::
vector
<
ServerNodeWithId
>&
src
,
std
::
vector
<
ServerId
>*
dst
,
const
NamingServiceFilter
*
filter
);
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
bthread_t
_tid
;
// TODO: better use a name.
const
NamingService
*
_source_ns
;
...
...
brpc/policy/locality_aware_load_balancer.cpp
View file @
d729250a
...
...
@@ -430,7 +430,7 @@ void LocalityAwareLoadBalancer::Destroy() {
}
void
LocalityAwareLoadBalancer
::
Weight
::
Describe
(
std
::
ostream
&
os
,
int64_t
now
)
{
pthread_mutex_lock
(
&
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
int64_t
begin_time_sum
=
_begin_time_sum
;
int
begin_time_count
=
_begin_time_count
;
int64_t
weight
=
_weight
;
...
...
@@ -447,7 +447,7 @@ void LocalityAwareLoadBalancer::Weight::Describe(std::ostream& os, int64_t now)
}
qps
=
n
*
1000000
/
(
double
)(
now
-
_time_q
.
top
()
->
end_time_us
);
}
pthread_mutex_unlock
(
&
_mutex
);
mu
.
unlock
(
);
os
<<
"weight="
<<
weight
;
if
(
base_weight
!=
weight
)
{
...
...
@@ -502,19 +502,16 @@ LocalityAwareLoadBalancer::Weight::Weight(int64_t initial_weight)
,
_avg_latency
(
0
)
,
_dev
(
0
)
,
_time_q
(
_time_q_items
,
sizeof
(
_time_q_items
),
base
::
NOT_OWN_STORAGE
)
{
CHECK_EQ
(
0
,
pthread_mutex_init
(
&
_mutex
,
NULL
));
}
LocalityAwareLoadBalancer
::
Weight
::~
Weight
()
{
pthread_mutex_destroy
(
&
_mutex
);
}
int64_t
LocalityAwareLoadBalancer
::
Weight
::
Disable
()
{
pthread_mutex_lock
(
&
_mutex
);
BAIDU_SCOPED_LOCK
(
_mutex
);
const
int64_t
saved
=
_weight
;
_base_weight
=
0
;
_weight
=
0
;
pthread_mutex_unlock
(
&
_mutex
);
return
saved
;
}
...
...
@@ -528,13 +525,12 @@ int64_t LocalityAwareLoadBalancer::Weight::MarkOld(size_t index) {
}
std
::
pair
<
int64_t
,
int64_t
>
LocalityAwareLoadBalancer
::
Weight
::
ClearOld
()
{
pthread_mutex_lock
(
&
_mutex
);
BAIDU_SCOPED_LOCK
(
_mutex
);
const
int64_t
old_weight
=
_old_weight
;
const
int64_t
diff
=
_old_diff_sum
;
_old_diff_sum
=
0
;
_old_index
=
(
size_t
)
-
1
;
_old_weight
=
0
;
pthread_mutex_unlock
(
&
_mutex
);
return
std
::
make_pair
(
old_weight
,
diff
);
}
...
...
brpc/policy/locality_aware_load_balancer.h
View file @
d729250a
...
...
@@ -86,7 +86,7 @@ private:
private
:
int64_t
_weight
;
int64_t
_base_weight
;
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
int64_t
_begin_time_sum
;
int
_begin_time_count
;
int64_t
_old_diff_sum
;
...
...
brpc/policy/rtmp_protocol.cpp
View file @
d729250a
...
...
@@ -720,8 +720,6 @@ RtmpContext::RtmpContext(const RtmpClientOptions* copt, const Server* server)
_service
=
server
->
options
().
rtmp_service
;
}
_free_ms_ids
.
reserve
(
32
);
pthread_mutex_init
(
&
_stream_mutex
,
NULL
);
pthread_mutex_init
(
&
_trans_mutex
,
NULL
);
CHECK_EQ
(
0
,
_mstream_map
.
init
(
1024
,
70
));
CHECK_EQ
(
0
,
_trans_map
.
init
(
1024
,
70
));
memset
(
_cstream_ctx
,
0
,
sizeof
(
_cstream_ctx
));
...
...
@@ -765,8 +763,6 @@ RtmpContext::~RtmpContext() {
free
(
_s1_digest
);
_s1_digest
=
NULL
;
pthread_mutex_destroy
(
&
_stream_mutex
);
pthread_mutex_destroy
(
&
_trans_mutex
);
}
void
RtmpContext
::
Destroy
()
{
...
...
@@ -926,7 +922,7 @@ bool RtmpContext::AddClientStream(RtmpStreamBase* stream) {
}
uint32_t
chunk_stream_id
=
0
;
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_stream_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_stream_mutex
);
MessageStreamInfo
&
info
=
_mstream_map
[
stream_id
];
if
(
info
.
stream
!=
NULL
)
{
mu
.
unlock
();
...
...
@@ -943,7 +939,7 @@ bool RtmpContext::AddClientStream(RtmpStreamBase* stream) {
bool
RtmpContext
::
AddServerStream
(
RtmpStreamBase
*
stream
)
{
uint32_t
stream_id
=
0
;
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_stream_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_stream_mutex
);
if
(
!
AllocateMessageStreamId
(
&
stream_id
))
{
return
false
;
}
...
...
@@ -974,7 +970,7 @@ bool RtmpContext::RemoveMessageStream(RtmpStreamBase* stream) {
// for deref the stream outside _stream_mutex.
base
::
intrusive_ptr
<
RtmpStreamBase
>
deref_ptr
;
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_stream_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_stream_mutex
);
MessageStreamInfo
*
info
=
_mstream_map
.
seek
(
stream_id
);
if
(
info
==
NULL
)
{
mu
.
unlock
();
...
...
brpc/policy/rtmp_protocol.h
View file @
d729250a
...
...
@@ -367,14 +367,14 @@ private:
RtmpService
*
_service
;
// Mapping message_stream_id to message streams.
pthread_mutex_t
_stream_mutex
;
base
::
Mutex
_stream_mutex
;
struct
MessageStreamInfo
{
base
::
intrusive_ptr
<
RtmpStreamBase
>
stream
;
};
base
::
FlatMap
<
uint32_t
,
MessageStreamInfo
>
_mstream_map
;
// Mapping transaction id to handlers.
pthread_mutex_t
_trans_mutex
;
base
::
Mutex
_trans_mutex
;
uint32_t
_trans_id_allocator
;
base
::
FlatMap
<
uint32_t
,
RtmpTransactionHandler
*>
_trans_map
;
...
...
brpc/progressive_attachment.cpp
View file @
d729250a
...
...
@@ -27,7 +27,6 @@ ProgressiveAttachment::ProgressiveAttachment(SocketUniquePtr& movable_httpsock,
,
_rpc_state
(
RPC_RUNNING
)
,
_notify_id
(
INVALID_BTHREAD_ID
)
{
_httpsock
.
swap
(
movable_httpsock
);
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
ProgressiveAttachment
::~
ProgressiveAttachment
()
{
...
...
@@ -55,7 +54,6 @@ ProgressiveAttachment::~ProgressiveAttachment() {
if
(
_notify_id
!=
INVALID_BTHREAD_ID
)
{
bthread_id_error
(
_notify_id
,
0
);
}
pthread_mutex_destroy
(
&
_mutex
);
}
static
char
s_hex_map
[]
=
{
'0'
,
'1'
,
'2'
,
'3'
,
'4'
,
'5'
,
'6'
,
'7'
,
'8'
,
...
...
@@ -115,7 +113,7 @@ int ProgressiveAttachment::Write(const base::IOBuf& data) {
int
rpc_state
=
_rpc_state
.
load
(
base
::
memory_order_acquire
);
if
(
rpc_state
==
RPC_RUNNING
)
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
rpc_state
=
_rpc_state
.
load
(
base
::
memory_order_acquire
);
if
(
rpc_state
==
RPC_RUNNING
)
{
if
(
_saved_buf
.
size
()
>=
(
size_t
)
FLAGS_socket_max_unwritten_bytes
||
...
...
@@ -148,7 +146,7 @@ int ProgressiveAttachment::Write(const void* data, size_t n) {
}
int
rpc_state
=
_rpc_state
.
load
(
base
::
memory_order_acquire
);
if
(
rpc_state
==
RPC_RUNNING
)
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
rpc_state
=
_rpc_state
.
load
(
base
::
memory_order_relaxed
);
if
(
rpc_state
==
RPC_RUNNING
)
{
if
(
_saved_buf
.
size
()
>=
(
size_t
)
FLAGS_socket_max_unwritten_bytes
||
...
...
@@ -190,7 +188,7 @@ void ProgressiveAttachment::MarkRPCAsDone(bool rpc_failed) {
int
ntry
=
0
;
bool
permanent_error
=
false
;
do
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
if
(
_saved_buf
.
empty
()
||
permanent_error
||
rpc_failed
)
{
base
::
IOBuf
tmp
;
tmp
.
swap
(
_saved_buf
);
// Clear _saved_buf outside lock.
...
...
brpc/progressive_attachment.h
View file @
d729250a
...
...
@@ -54,7 +54,7 @@ protected:
bool
_before_http_1_1
;
bool
_pause_from_mark_rpc_as_done
;
base
::
atomic
<
int
>
_rpc_state
;
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
SocketUniquePtr
_httpsock
;
base
::
IOBuf
_saved_buf
;
bthread_id_t
_notify_id
;
...
...
brpc/rtmp.cpp
View file @
d729250a
...
...
@@ -1205,11 +1205,9 @@ RtmpStreamBase::RtmpStreamBase(bool is_client)
,
_message_stream_id
(
0
)
,
_chunk_stream_id
(
0
)
,
_create_realtime_us
(
base
::
gettimeofday_us
())
{
pthread_mutex_init
(
&
_call_mutex
,
NULL
);
}
RtmpStreamBase
::~
RtmpStreamBase
()
{
pthread_mutex_destroy
(
&
_call_mutex
);
}
int
RtmpStreamBase
::
SendMessage
(
uint32_t
timestamp
,
...
...
@@ -1440,7 +1438,7 @@ void RtmpStreamBase::OnStop() {
}
bool
RtmpStreamBase
::
BeginProcessingMessage
(
const
char
*
fun_name
)
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_call_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_call_mutex
);
if
(
_stopped
)
{
mu
.
unlock
();
LOG
(
ERROR
)
<<
fun_name
<<
" is called after OnStop()"
;
...
...
@@ -1460,7 +1458,7 @@ bool RtmpStreamBase::BeginProcessingMessage(const char* fun_name) {
}
void
RtmpStreamBase
::
EndProcessingMessage
()
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_call_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_call_mutex
);
_processing_msg
=
false
;
if
(
_stopped
)
{
mu
.
unlock
();
...
...
@@ -1498,7 +1496,7 @@ void RtmpStreamBase::CallOnVideoMessage(RtmpVideoMessage* msg) {
void
RtmpStreamBase
::
CallOnStop
()
{
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_call_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_call_mutex
);
if
(
_stopped
)
{
mu
.
unlock
();
LOG
(
ERROR
)
<<
"OnStop() was called more than once"
;
...
...
@@ -1529,13 +1527,11 @@ RtmpClientStream::RtmpClientStream()
,
_created_stream_with_play_or_publish
(
false
)
,
_is_server_accepted
(
false
)
,
_state
(
STATE_UNINITIALIZED
)
{
pthread_mutex_init
(
&
_state_mutex
,
NULL
);
get_rtmp_bvars
()
->
client_stream_count
<<
1
;
_self_ref
.
reset
(
this
);
}
RtmpClientStream
::~
RtmpClientStream
()
{
pthread_mutex_destroy
(
&
_state_mutex
);
get_rtmp_bvars
()
->
client_stream_count
<<
-
1
;
}
...
...
@@ -1544,7 +1540,7 @@ void RtmpClientStream::Destroy() {
CallId
create_stream_rpc_id
=
INVALID_BTHREAD_ID
;
base
::
intrusive_ptr
<
RtmpClientStream
>
self_ref
;
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
switch
(
_state
)
{
case
STATE_UNINITIALIZED
:
_state
=
STATE_DESTROYING
;
...
...
@@ -1579,7 +1575,7 @@ void RtmpClientStream::Destroy() {
void
RtmpClientStream
::
SignalError
()
{
bthread_id_t
onfail_id
=
INVALID_BTHREAD_ID
;
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
switch
(
_state
)
{
case
STATE_UNINITIALIZED
:
_state
=
STATE_ERROR
;
...
...
@@ -1618,7 +1614,7 @@ void RtmpClientStream::CleanupSocketForStream(
void
RtmpClientStream
::
ReplaceSocketForStream
(
SocketUniquePtr
*
inout
,
Controller
*
cntl
)
{
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
if
(
_state
==
STATE_ERROR
||
_state
==
STATE_DESTROYING
)
{
cntl
->
SetFailed
(
EINVAL
,
"Fail to replace socket for stream, _state is error or destroying"
);
return
;
...
...
@@ -1663,7 +1659,7 @@ int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
void
RtmpClientStream
::
OnFailedToCreateStream
()
{
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
switch
(
_state
)
{
case
STATE_CREATING
:
_state
=
STATE_ERROR
;
...
...
@@ -1719,7 +1715,7 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
int
rc
=
0
;
bthread_id_t
onfail_id
=
INVALID_BTHREAD_ID
;
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
switch
(
_state
)
{
case
STATE_CREATING
:
CHECK
(
_rtmpsock
);
...
...
@@ -2046,7 +2042,7 @@ void RtmpClientStream::Init(const RtmpClient* client,
return
OnStopInternal
();
}
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
if
(
_state
==
STATE_DESTROYING
||
_state
==
STATE_ERROR
)
{
// already Destroy()-ed or SignalError()-ed
LOG
(
WARNING
)
<<
"RtmpClientStream="
<<
this
<<
" was already "
...
...
@@ -2073,7 +2069,7 @@ void RtmpClientStream::Init(const RtmpClient* client,
google
::
protobuf
::
Message
*
res
=
(
google
::
protobuf
::
Message
*
)
this
;
const
CallId
call_id
=
done
->
cntl
.
call_id
();
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_state_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_state_mutex
);
switch
(
_state
)
{
case
STATE_UNINITIALIZED
:
_state
=
STATE_CREATING
;
...
...
@@ -2133,14 +2129,12 @@ RtmpRetryingClientStream::RtmpRetryingClientStream()
,
_create_timer_id
(
0
)
,
_client_selector
(
NULL
)
{
get_rtmp_bvars
()
->
retrying_client_stream_count
<<
1
;
pthread_mutex_init
(
&
_stream_mutex
,
NULL
);
_self_ref
.
reset
(
this
);
}
RtmpRetryingClientStream
::~
RtmpRetryingClientStream
()
{
delete
_client_selector
;
_client_selector
=
NULL
;
pthread_mutex_destroy
(
&
_stream_mutex
);
get_rtmp_bvars
()
->
retrying_client_stream_count
<<
-
1
;
}
...
...
brpc/rtmp.h
View file @
d729250a
...
...
@@ -578,7 +578,7 @@ friend class policy::OnServerStreamCreated;
uint32_t
_chunk_stream_id
;
int64_t
_create_realtime_us
;
SocketUniquePtr
_rtmpsock
;
pthread_mutex_t
_call_mutex
;
base
::
Mutex
_call_mutex
;
};
struct
RtmpClientOptions
{
...
...
@@ -812,7 +812,7 @@ friend class RtmpRetryingClientStream;
STATE_DESTROYING
,
};
State
_state
;
pthread_mutex_t
_state_mutex
;
base
::
Mutex
_state_mutex
;
RtmpClientStreamOptions
_options
;
};
...
...
@@ -944,7 +944,7 @@ friend class InitSubStream;
base
::
intrusive_ptr
<
SubStream
>
_using_sub_stream
;
base
::
intrusive_ptr
<
RtmpRetryingClientStream
>
_self_ref
;
mutable
pthread_mutex_t
_stream_mutex
;
mutable
base
::
Mutex
_stream_mutex
;
RtmpRetryingClientStreamOptions
_options
;
base
::
atomic
<
bool
>
_destroying
;
base
::
atomic
<
bool
>
_called_on_stop
;
...
...
brpc/selective_channel.cpp
View file @
d729250a
...
...
@@ -65,9 +65,7 @@ public:
bool
need_feedback
;
};
ChannelBalancer
()
{
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
ChannelBalancer
()
{}
~
ChannelBalancer
();
int
Init
(
const
char
*
lb_name
);
int
AddChannel
(
ChannelBase
*
sub_channel
,
...
...
@@ -78,7 +76,7 @@ public:
void
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
);
private
:
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
// Find out duplicated sub channels.
ChannelToIdMap
_chan_map
;
};
...
...
@@ -143,7 +141,6 @@ private:
// ===============================================
ChannelBalancer
::~
ChannelBalancer
()
{
pthread_mutex_destroy
(
&
_mutex
);
for
(
ChannelToIdMap
::
iterator
it
=
_chan_map
.
begin
();
it
!=
_chan_map
.
end
();
++
it
)
{
SocketUniquePtr
ptr
(
it
->
second
);
// Dereference
...
...
brpc/simple_data_pool.h
View file @
d729250a
...
...
@@ -7,7 +7,7 @@
#ifndef BRPC_SIMPLE_DATA_POOL_H
#define BRPC_SIMPLE_DATA_POOL_H
#include
<pthread.h>
#include
"base/scoped_lock.h"
#include "brpc/data_factory.h"
...
...
@@ -35,7 +35,7 @@ public:
Stat
stat
()
const
;
private
:
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
unsigned
_capacity
;
unsigned
_size
;
base
::
atomic
<
unsigned
>
_ncreated
;
...
...
@@ -49,12 +49,10 @@ inline SimpleDataPool::SimpleDataPool(const DataFactory* factory)
,
_ncreated
(
0
)
,
_pool
(
NULL
)
,
_factory
(
factory
)
{
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
inline
SimpleDataPool
::~
SimpleDataPool
()
{
Reset
(
NULL
);
pthread_mutex_destroy
(
&
_mutex
);
}
inline
void
SimpleDataPool
::
Reset
(
const
DataFactory
*
factory
)
{
...
...
@@ -132,12 +130,12 @@ inline void SimpleDataPool::Return(void* data) {
if
(
data
==
NULL
)
{
return
;
}
pthread_mutex_lock
(
&
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
if
(
_capacity
==
_size
)
{
const
unsigned
new_cap
=
(
_capacity
==
0
?
128
:
(
_capacity
*
3
/
2
));
void
**
new_pool
=
(
void
**
)
malloc
(
new_cap
*
sizeof
(
void
*
));
if
(
NULL
==
new_pool
)
{
pthread_mutex_unlock
(
&
_mutex
);
mu
.
unlock
(
);
return
_factory
->
DestroyData
(
data
);
}
if
(
_pool
)
{
...
...
@@ -148,7 +146,6 @@ inline void SimpleDataPool::Return(void* data) {
_pool
=
new_pool
;
}
_pool
[
_size
++
]
=
data
;
pthread_mutex_unlock
(
&
_mutex
);
}
inline
SimpleDataPool
::
Stat
SimpleDataPool
::
stat
()
const
{
...
...
brpc/socket_map.cpp
View file @
d729250a
...
...
@@ -117,7 +117,6 @@ SocketMap::SocketMap()
:
_exposed_in_bvar
(
false
)
,
_this_map_bvar
(
NULL
)
,
_has_close_idle_thread
(
false
)
{
pthread_mutex_init
(
&
_mutex
,
NULL
);
}
SocketMap
::~
SocketMap
()
{
...
...
@@ -127,7 +126,6 @@ SocketMap::~SocketMap() {
bthread_join
(
_close_idle_thread
,
NULL
);
}
if
(
!
_map
.
empty
())
{
pthread_mutex_lock
(
&
_mutex
);
int
nleft
=
0
;
for
(
Map
::
iterator
it
=
_map
.
begin
();
it
!=
_map
.
end
();
++
it
)
{
SingleConnection
*
sc
=
&
it
->
second
;
...
...
@@ -141,7 +139,6 @@ SocketMap::~SocketMap() {
LOG
(
ERROR
)
<<
' '
<<
*
sc
->
socket
<<
noflush
;
}
}
pthread_mutex_unlock
(
&
_mutex
);
if
(
nleft
)
{
LOG
(
ERROR
);
}
...
...
@@ -152,7 +149,6 @@ SocketMap::~SocketMap() {
}
delete
_options
.
socket_creator
;
_options
.
socket_creator
=
NULL
;
pthread_mutex_destroy
(
&
_mutex
);
}
int
SocketMap
::
Init
(
const
SocketMapOptions
&
options
)
{
...
...
@@ -185,7 +181,7 @@ void SocketMap::Print(std::ostream& os) {
// TODO: Elaborate.
size_t
count
=
0
;
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
count
=
_map
.
size
();
}
os
<<
"count="
<<
count
;
...
...
@@ -196,7 +192,7 @@ void SocketMap::PrintSocketMap(std::ostream& os, void* arg) {
}
int
SocketMap
::
Insert
(
const
base
::
EndPoint
&
pt
,
SocketId
*
id
)
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
SingleConnection
*
sc
=
_map
.
seek
(
pt
);
if
(
sc
)
{
if
(
!
sc
->
socket
->
Failed
()
||
...
...
@@ -253,7 +249,7 @@ void SocketMap::Remove(const base::EndPoint& pt, SocketId expected_id) {
void
SocketMap
::
RemoveInternal
(
const
base
::
EndPoint
&
pt
,
SocketId
expected_id
,
bool
remove_orphan
)
{
std
::
unique_lock
<
pthread_mutex_t
>
mu
(
_mutex
);
std
::
unique_lock
<
base
::
Mutex
>
mu
(
_mutex
);
SingleConnection
*
sc
=
_map
.
seek
(
pt
);
if
(
!
sc
)
{
return
;
...
...
brpc/socket_map.h
View file @
d729250a
...
...
@@ -103,7 +103,7 @@ private:
// and destroyed, a single map+mutex may become hot-spots.
typedef
base
::
FlatMap
<
base
::
EndPoint
,
SingleConnection
>
Map
;
SocketMapOptions
_options
;
pthread_mutex_t
_mutex
;
base
::
Mutex
_mutex
;
Map
_map
;
bool
_exposed_in_bvar
;
bvar
::
PassiveStatus
<
std
::
string
>*
_this_map_bvar
;
...
...
tools/parallel_http/parallel_http.cpp
View file @
d729250a
...
...
@@ -23,7 +23,7 @@ struct AccessThreadArgs {
const
std
::
deque
<
std
::
string
>*
url_list
;
size_t
offset
;
std
::
deque
<
std
::
pair
<
std
::
string
,
base
::
IOBuf
>
>
output_queue
;
pthread_mutex_t
output_queue_mutex
;
base
::
Mutex
output_queue_mutex
;
base
::
atomic
<
int
>
current_concurrency
;
};
...
...
@@ -124,7 +124,6 @@ int main(int argc, char** argv) {
for
(
int
i
=
0
;
i
<
FLAGS_thread_num
;
++
i
)
{
args
[
i
].
url_list
=
&
url_list
;
args
[
i
].
offset
=
i
;
pthread_mutex_init
(
&
args
[
i
].
output_queue_mutex
,
NULL
);
args
[
i
].
current_concurrency
.
store
(
0
,
base
::
memory_order_relaxed
);
}
std
::
vector
<
bthread_t
>
tids
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment