Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
bbae67df
Commit
bbae67df
authored
Aug 09, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Code style improvements
parent
b102e304
Hide whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
133 additions
and
139 deletions
+133
-139
atomic_counter.hpp
src/atomic_counter.hpp
+1
-8
atomic_ptr.hpp
src/atomic_ptr.hpp
+2
-8
blob.hpp
src/blob.hpp
+6
-6
dealer.cpp
src/dealer.cpp
+1
-1
err.cpp
src/err.cpp
+2
-1
ip.cpp
src/ip.cpp
+1
-1
ipc_listener.cpp
src/ipc_listener.cpp
+1
-3
ipc_listener.hpp
src/ipc_listener.hpp
+1
-1
macros.hpp
src/macros.hpp
+8
-0
null_mechanism.cpp
src/null_mechanism.cpp
+4
-7
pipe.cpp
src/pipe.cpp
+1
-1
pipe.hpp
src/pipe.hpp
+3
-3
plain_client.cpp
src/plain_client.cpp
+8
-6
proxy.cpp
src/proxy.cpp
+14
-17
random.cpp
src/random.cpp
+2
-0
raw_decoder.cpp
src/raw_decoder.cpp
+3
-3
raw_decoder.hpp
src/raw_decoder.hpp
+1
-1
router.cpp
src/router.cpp
+6
-3
router.hpp
src/router.hpp
+2
-2
signaler.cpp
src/signaler.cpp
+15
-15
socket_base.cpp
src/socket_base.cpp
+30
-33
socket_base.hpp
src/socket_base.hpp
+7
-6
socks.cpp
src/socks.cpp
+2
-1
tcp.cpp
src/tcp.cpp
+1
-1
tcp_address.cpp
src/tcp_address.cpp
+1
-1
trie.cpp
src/trie.cpp
+9
-9
trie.hpp
src/trie.hpp
+1
-1
No files found.
src/atomic_counter.hpp
View file @
bbae67df
...
...
@@ -31,6 +31,7 @@
#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__
#include "stdint.hpp"
#include "macros.hpp"
#if defined ZMQ_FORCE_MUTEXES
#define ZMQ_ATOMIC_COUNTER_MUTEX
...
...
@@ -66,14 +67,6 @@
#include <arch/atomic.h>
#endif
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
namespace
zmq
{
// This class represents an integer that can be incremented/decremented
...
...
src/atomic_ptr.hpp
View file @
bbae67df
...
...
@@ -30,6 +30,8 @@
#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#include "macros.hpp"
#if defined ZMQ_FORCE_MUTEXES
#define ZMQ_ATOMIC_PTR_MUTEX
#elif defined ZMQ_HAVE_ATOMIC_INTRINSICS
...
...
@@ -64,14 +66,6 @@
#include <arch/atomic.h>
#endif
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
namespace
zmq
{
#if !defined ZMQ_ATOMIC_PTR_CXX11
...
...
src/blob.hpp
View file @
bbae67df
...
...
@@ -30,6 +30,7 @@
#ifndef __ZMQ_BLOB_HPP_INCLUDED__
#define __ZMQ_BLOB_HPP_INCLUDED__
#include "macros.hpp"
#include "err.hpp"
#include <stdlib.h>
...
...
@@ -116,7 +117,7 @@ struct blob_t
// Defines an order relationship on blob_t.
bool
operator
<
(
blob_t
const
&
other_
)
const
{
int
cmpres
=
const
int
cmpres
=
memcmp
(
_data
,
other_
.
_data
,
std
::
min
(
_size
,
other_
.
_size
));
return
cmpres
<
0
||
(
cmpres
==
0
&&
_size
<
other_
.
_size
);
}
...
...
@@ -164,14 +165,13 @@ struct blob_t
blob_t
(
const
blob_t
&
)
=
delete
;
blob_t
&
operator
=
(
const
blob_t
&
)
=
delete
;
blob_t
(
blob_t
&&
other_
)
:
_data
(
other_
.
_data
),
_size
(
other_
.
_size
),
_owned
(
other_
.
_owned
)
blob_t
(
blob_t
&&
other_
)
ZMQ_NOEXCEPT
:
_data
(
other_
.
_data
),
_size
(
other_
.
_size
),
_owned
(
other_
.
_owned
)
{
other_
.
_owned
=
false
;
}
blob_t
&
operator
=
(
blob_t
&&
other_
)
blob_t
&
operator
=
(
blob_t
&&
other_
)
ZMQ_NOEXCEPT
{
if
(
this
!=
&
other_
)
{
clear
();
...
...
src/dealer.cpp
View file @
bbae67df
...
...
@@ -60,7 +60,7 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_,
rc
=
pipe_
->
write
(
&
probe_msg
);
// zmq_assert (rc) is not applicable here, since it is not a bug.
(
void
)
rc
;
LIBZMQ_UNUSED
(
rc
)
;
pipe_
->
flush
();
...
...
src/err.cpp
View file @
bbae67df
...
...
@@ -29,6 +29,7 @@
#include "precompiled.hpp"
#include "err.hpp"
#include "macros.hpp"
const
char
*
zmq
::
errno_to_string
(
int
errno_
)
{
...
...
@@ -82,7 +83,7 @@ void zmq::zmq_abort (const char *errmsg_)
extra_info
[
0
]
=
(
ULONG_PTR
)
errmsg_
;
RaiseException
(
0x40000015
,
EXCEPTION_NONCONTINUABLE
,
1
,
extra_info
);
#else
(
void
)
errmsg_
;
LIBZMQ_UNUSED
(
errmsg_
)
;
print_backtrace
();
abort
();
#endif
...
...
src/ip.cpp
View file @
bbae67df
...
...
@@ -125,7 +125,7 @@ void zmq::unblock_socket (fd_t s_)
void
zmq
::
enable_ipv4_mapping
(
fd_t
s_
)
{
(
void
)
s_
;
LIBZMQ_UNUSED
(
s_
)
;
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD
#ifdef ZMQ_HAVE_WINDOWS
...
...
src/ipc_listener.cpp
View file @
bbae67df
...
...
@@ -117,9 +117,7 @@ int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
path_
.
assign
(
&
buffer
[
0
]);
file_
=
path_
+
"/socket"
;
#else
// Silence -Wunused-parameter. #pragma and __attribute__((unused)) are not
// very portable unfortunately...
(
void
)
path_
;
LIBZMQ_UNUSED
(
path_
);
int
fd
=
mkstemp
(
&
buffer
[
0
]);
if
(
fd
==
-
1
)
return
-
1
;
...
...
src/ipc_listener.hpp
View file @
bbae67df
...
...
@@ -76,7 +76,7 @@ class ipc_listener_t : public own_t, public io_object_t
// Filter new connections if the OS provides a mechanism to get
// the credentials of the peer process. Called from accept().
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
bool
filter
(
fd_t
sock
);
bool
filter
(
fd_t
sock
_
);
#endif
// Accept the new connection. Returns the file descriptor of the
...
...
src/macros.hpp
View file @
bbae67df
...
...
@@ -11,3 +11,11 @@
}
/******************************************************************************/
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
src/null_mechanism.cpp
View file @
bbae67df
...
...
@@ -215,16 +215,13 @@ int zmq::null_mechanism_t::zap_msg_available ()
zmq
::
mechanism_t
::
status_t
zmq
::
null_mechanism_t
::
status
()
const
{
if
(
_ready_command_sent
&&
_ready_command_received
)
return
ready
;
const
bool
command_sent
=
_ready_command_sent
||
_error_command_sent
;
const
bool
command_received
=
_ready_command_received
||
_error_command_received
;
if
(
_ready_command_sent
&&
_ready_command_received
)
return
mechanism_t
::
ready
;
if
(
command_sent
&&
command_received
)
return
error
;
else
return
handshaking
;
return
command_sent
&&
command_received
?
error
:
handshaking
;
}
void
zmq
::
null_mechanism_t
::
send_zap_request
()
...
...
src/pipe.cpp
View file @
bbae67df
...
...
@@ -409,7 +409,7 @@ void zmq::pipe_t::terminate (bool delay_)
}
// The simple sync termination case. Ask the peer to terminate and wait
// for the ack.
else
if
(
_state
==
active
)
{
if
(
_state
==
active
)
{
send_pipe_term
(
_peer
);
_state
=
term_req_sent1
;
}
...
...
src/pipe.hpp
View file @
bbae67df
...
...
@@ -85,11 +85,11 @@ class pipe_t : public object_t,
void
set_event_sink
(
i_pipe_events
*
sink_
);
// Pipe endpoint can store an routing ID to be used by its clients.
void
set_server_socket_routing_id
(
uint32_t
routing_id_
);
void
set_server_socket_routing_id
(
uint32_t
server_socket_
routing_id_
);
uint32_t
get_server_socket_routing_id
()
const
;
// Pipe endpoint can store an opaque ID to be used by its clients.
void
set_router_socket_routing_id
(
const
blob_t
&
identity
_
);
void
set_router_socket_routing_id
(
const
blob_t
&
router_socket_routing_id
_
);
const
blob_t
&
get_routing_id
()
const
;
const
blob_t
&
get_credential
()
const
;
...
...
@@ -168,7 +168,7 @@ class pipe_t : public object_t,
// Pipepair uses this function to let us know about
// the peer pipe object.
void
set_peer
(
pipe_t
*
p
ipe
_
);
void
set_peer
(
pipe_t
*
p
eer
_
);
// Destructor is private. Pipe objects destroy themselves.
~
pipe_t
();
...
...
src/plain_client.cpp
View file @
bbae67df
...
...
@@ -105,12 +105,14 @@ int zmq::plain_client_t::process_handshake_command (msg_t *msg_)
zmq
::
mechanism_t
::
status_t
zmq
::
plain_client_t
::
status
()
const
{
if
(
_state
==
ready
)
return
mechanism_t
::
ready
;
if
(
_state
==
error_command_received
)
return
mechanism_t
::
error
;
else
return
mechanism_t
::
handshaking
;
switch
(
_state
)
{
case
ready
:
return
mechanism_t
::
ready
;
case
error_command_received
:
return
mechanism_t
::
error
;
default
:
return
mechanism_t
::
handshaking
;
}
}
void
zmq
::
plain_client_t
::
produce_hello
(
msg_t
*
msg_
)
const
...
...
src/proxy.cpp
View file @
bbae67df
...
...
@@ -90,7 +90,7 @@ typedef struct
// Utility functions
int
capture
(
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
,
zmq
::
msg_t
*
msg_
,
int
more_
=
0
)
{
// Copy message to capture socket if any
...
...
@@ -99,7 +99,7 @@ int capture (class zmq::socket_base_t *capture_,
int
rc
=
ctrl
.
init
();
if
(
unlikely
(
rc
<
0
))
return
-
1
;
rc
=
ctrl
.
copy
(
msg_
);
rc
=
ctrl
.
copy
(
*
msg_
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
rc
=
capture_
->
send
(
&
ctrl
,
more_
?
ZMQ_SNDMORE
:
0
);
...
...
@@ -114,17 +114,17 @@ int forward (class zmq::socket_base_t *from_,
class
zmq
::
socket_base_t
*
to_
,
zmq_socket_stats_t
*
to_stats_
,
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
)
zmq
::
msg_t
*
msg_
)
{
int
more
;
size_t
moresz
;
size_t
complete_msg_size
=
0
;
while
(
true
)
{
int
rc
=
from_
->
recv
(
&
msg_
,
0
);
int
rc
=
from_
->
recv
(
msg_
,
0
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
complete_msg_size
+=
msg_
.
size
();
complete_msg_size
+=
msg_
->
size
();
moresz
=
sizeof
more
;
rc
=
from_
->
getsockopt
(
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
...
...
@@ -136,7 +136,7 @@ int forward (class zmq::socket_base_t *from_,
if
(
unlikely
(
rc
<
0
))
return
-
1
;
rc
=
to_
->
send
(
&
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
rc
=
to_
->
send
(
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
...
...
@@ -163,7 +163,7 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
// VSM of 8 bytes can't fail to init
msg
.
init_size
(
sizeof
(
uint64_t
));
memcpy
(
msg
.
data
(),
(
const
void
*
)
&
stat_
,
sizeof
(
uint64_t
));
memcpy
(
msg
.
data
(),
&
stat_
,
sizeof
(
uint64_t
));
// if the first message is handed to the pipe successfully then the HWM
// is not full, which means failures are due to interrupts (on Windows pipes
...
...
@@ -366,11 +366,8 @@ int zmq::proxy (class socket_base_t *frontend_,
}
}
int
i
;
bool
request_processed
,
reply_processed
;
while
(
state
!=
terminated
)
{
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
...
...
@@ -387,7 +384,7 @@ int zmq::proxy (class socket_base_t *frontend_,
CHECK_RC_EXIT_ON_FAILURE
();
// Process events.
for
(
i
=
0
;
i
<
rc
;
i
++
)
{
for
(
i
nt
i
=
0
;
i
<
rc
;
i
++
)
{
if
(
events
[
i
].
socket
==
frontend_
)
{
frontend_in
=
(
events
[
i
].
events
&
ZMQ_POLLIN
)
!=
0
;
frontend_out
=
(
events
[
i
].
events
&
ZMQ_POLLOUT
)
!=
0
;
...
...
@@ -413,7 +410,7 @@ int zmq::proxy (class socket_base_t *frontend_,
}
// Copy message to capture socket if any.
rc
=
capture
(
capture_
,
msg
);
rc
=
capture
(
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
if
(
msg
.
size
()
==
5
&&
memcmp
(
msg
.
data
(),
"PAUSE"
,
5
)
==
0
)
{
...
...
@@ -452,7 +449,7 @@ int zmq::proxy (class socket_base_t *frontend_,
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if
(
frontend_in
&&
(
backend_out
||
frontend_equal_to_backend
))
{
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
&
backend_stats
,
capture_
,
msg
);
&
backend_stats
,
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
request_processed
=
true
;
frontend_in
=
backend_out
=
false
;
...
...
@@ -465,7 +462,7 @@ int zmq::proxy (class socket_base_t *frontend_,
// design in 'for' event processing loop.
if
(
backend_in
&&
frontend_out
)
{
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
&
frontend_stats
,
capture_
,
msg
);
&
frontend_stats
,
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
reply_processed
=
true
;
backend_in
=
frontend_out
=
false
;
...
...
@@ -595,7 +592,7 @@ int zmq::proxy (class socket_base_t *frontend_,
return
close_and_return
(
&
msg
,
-
1
);
// Copy message to capture socket if any
rc
=
capture
(
capture_
,
msg
);
rc
=
capture
(
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
...
...
@@ -628,7 +625,7 @@ int zmq::proxy (class socket_base_t *frontend_,
if
(
state
==
active
&&
items
[
0
].
revents
&
ZMQ_POLLIN
&&
(
frontend_
==
backend_
||
itemsout
[
1
].
revents
&
ZMQ_POLLOUT
))
{
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
&
backend_stats
,
capture_
,
msg
);
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
}
...
...
@@ -637,7 +634,7 @@ int zmq::proxy (class socket_base_t *frontend_,
&&
items
[
1
].
revents
&
ZMQ_POLLIN
&&
itemsout
[
0
].
revents
&
ZMQ_POLLOUT
)
{
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
&
frontend_stats
,
capture_
,
msg
);
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
}
...
...
src/random.cpp
View file @
bbae67df
...
...
@@ -154,6 +154,8 @@ static void manage_random (bool init_)
}
else
{
randombytes_close
();
}
#else
LIBZMQ_UNUSED
(
init_
);
#endif
}
...
...
src/raw_decoder.cpp
View file @
bbae67df
...
...
@@ -36,13 +36,13 @@
zmq
::
raw_decoder_t
::
raw_decoder_t
(
size_t
bufsize_
)
:
_allocator
(
bufsize_
,
1
)
{
int
rc
=
_in_progress
.
init
();
const
int
rc
=
_in_progress
.
init
();
errno_assert
(
rc
==
0
);
}
zmq
::
raw_decoder_t
::~
raw_decoder_t
()
{
int
rc
=
_in_progress
.
close
();
const
int
rc
=
_in_progress
.
close
();
errno_assert
(
rc
==
0
);
}
...
...
@@ -56,7 +56,7 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_,
size_t
size_
,
size_t
&
bytes_used_
)
{
int
rc
=
const
int
rc
=
_in_progress
.
init
(
const_cast
<
unsigned
char
*>
(
data_
),
size_
,
shared_message_memory_allocator
::
call_dec_ref
,
_allocator
.
buffer
(),
_allocator
.
provide_content
());
...
...
src/raw_decoder.hpp
View file @
bbae67df
...
...
@@ -50,7 +50,7 @@ class raw_decoder_t : public i_decoder
virtual
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
);
virtual
int
decode
(
const
unsigned
char
*
data_
,
size_t
size_
,
size_t
&
proces
sed_
);
decode
(
const
unsigned
char
*
data_
,
size_t
size_
,
size_t
&
bytes_u
sed_
);
virtual
msg_t
*
msg
()
{
return
&
_in_progress
;
}
...
...
src/router.cpp
View file @
bbae67df
...
...
@@ -397,9 +397,9 @@ bool zmq::router_t::xhas_in ()
return
true
;
}
static
bool
check_pipe_hwm
(
const
zmq
::
pipe_t
&
pipe
)
static
bool
check_pipe_hwm
(
const
zmq
::
pipe_t
&
pipe
_
)
{
return
pipe
.
check_hwm
();
return
pipe
_
.
check_hwm
();
}
bool
zmq
::
router_t
::
xhas_out
()
...
...
@@ -424,7 +424,10 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
{
int
res
=
0
;
blob_t
routing_id_blob
((
unsigned
char
*
)
routing_id_
,
routing_id_size_
);
// TODO remove the const_cast, see comment in lookup_out_pipe
const
blob_t
routing_id_blob
(
static_cast
<
unsigned
char
*>
(
const_cast
<
void
*>
(
routing_id_
)),
routing_id_size_
);
const
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
routing_id_blob
);
if
(
!
out_pipe
)
{
errno
=
EHOSTUNREACH
;
...
...
src/router.hpp
View file @
bbae67df
...
...
@@ -62,7 +62,7 @@ class router_t : public routing_socket_base_t
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
int
get_peer_state
(
const
void
*
identity_
,
size_t
identity
_size_
)
const
;
int
get_peer_state
(
const
void
*
routing_id_
,
size_t
routing_id
_size_
)
const
;
protected
:
// Rollback any message parts that were sent but not yet flushed.
...
...
@@ -71,7 +71,7 @@ class router_t : public routing_socket_base_t
private
:
// Receive peer id and update lookup map
bool
identify_peer
(
pipe_t
*
pipe_
,
bool
locally_initiated
);
bool
identify_peer
(
pipe_t
*
pipe_
,
bool
locally_initiated
_
);
// Fair queueing object for inbound pipes.
fq_t
_fq
;
...
...
src/signaler.cpp
View file @
bbae67df
...
...
@@ -242,7 +242,7 @@ int zmq::signaler_t::wait (int timeout_)
struct
pollfd
pfd
;
pfd
.
fd
=
_r
;
pfd
.
events
=
POLLIN
;
int
rc
=
poll
(
&
pfd
,
1
,
timeout_
);
const
int
rc
=
poll
(
&
pfd
,
1
,
timeout_
);
if
(
unlikely
(
rc
<
0
))
{
errno_assert
(
errno
==
EINTR
);
return
-
1
;
...
...
@@ -319,7 +319,7 @@ void zmq::signaler_t::recv ()
#else
unsigned
char
dummy
;
#if defined ZMQ_HAVE_WINDOWS
int
nbytes
=
const
int
nbytes
=
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
wsa_assert
(
nbytes
!=
SOCKET_ERROR
);
#elif defined ZMQ_HAVE_VXWORKS
...
...
@@ -343,24 +343,24 @@ int zmq::signaler_t::recv_failable ()
if
(
sz
==
-
1
)
{
errno_assert
(
errno
==
EAGAIN
);
return
-
1
;
}
else
{
errno_assert
(
sz
==
sizeof
(
dummy
));
// If we accidentally grabbed the next signal(s) along with the current
// one, return it back to the eventfd object.
if
(
unlikely
(
dummy
>
1
))
{
const
uint64_t
inc
=
dummy
-
1
;
ssize_t
sz2
=
write
(
_w
,
&
inc
,
sizeof
(
inc
));
errno_assert
(
sz2
==
sizeof
(
inc
));
return
0
;
}
}
errno_assert
(
sz
==
sizeof
(
dummy
));
zmq_assert
(
dummy
==
1
);
// If we accidentally grabbed the next signal(s) along with the current
// one, return it back to the eventfd object.
if
(
unlikely
(
dummy
>
1
))
{
const
uint64_t
inc
=
dummy
-
1
;
ssize_t
sz2
=
write
(
_w
,
&
inc
,
sizeof
(
inc
));
errno_assert
(
sz2
==
sizeof
(
inc
));
return
0
;
}
zmq_assert
(
dummy
==
1
);
#else
unsigned
char
dummy
;
#if defined ZMQ_HAVE_WINDOWS
int
nbytes
=
const
int
nbytes
=
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
if
(
nbytes
==
SOCKET_ERROR
)
{
const
int
last_error
=
WSAGetLastError
();
...
...
src/socket_base.cpp
View file @
bbae67df
...
...
@@ -97,7 +97,7 @@
#include "scatter.hpp"
#include "dgram.hpp"
bool
zmq
::
socket_base_t
::
check_tag
()
bool
zmq
::
socket_base_t
::
check_tag
()
const
{
return
_tag
==
0xbaddecaf
;
}
...
...
@@ -253,7 +253,7 @@ zmq::socket_base_t::~socket_base_t ()
zmq_assert
(
_destroyed
);
}
zmq
::
i_mailbox
*
zmq
::
socket_base_t
::
get_mailbox
()
zmq
::
i_mailbox
*
zmq
::
socket_base_t
::
get_mailbox
()
const
{
return
_mailbox
;
}
...
...
@@ -274,7 +274,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
zmq_assert
(
uri_
!=
NULL
);
std
::
string
uri
(
uri_
);
std
::
string
::
size_type
pos
=
uri
.
find
(
"://"
);
const
std
::
string
::
size_type
pos
=
uri
.
find
(
"://"
);
if
(
pos
==
std
::
string
::
npos
)
{
errno
=
EINVAL
;
return
-
1
;
...
...
@@ -289,7 +289,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
return
0
;
}
int
zmq
::
socket_base_t
::
check_protocol
(
const
std
::
string
&
protocol_
)
int
zmq
::
socket_base_t
::
check_protocol
(
const
std
::
string
&
protocol_
)
const
{
// First check out whether the protocol is something we are aware of.
if
(
protocol_
!=
"inproc"
...
...
@@ -413,7 +413,7 @@ int zmq::socket_base_t::getsockopt (int option_,
}
if
(
option_
==
ZMQ_EVENTS
)
{
int
rc
=
process_commands
(
0
,
false
);
const
int
rc
=
process_commands
(
0
,
false
);
if
(
rc
!=
0
&&
(
errno
==
EINTR
||
errno
==
ETERM
))
{
return
-
1
;
}
...
...
@@ -439,20 +439,14 @@ int zmq::socket_base_t::join (const char *group_)
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
int
rc
=
xjoin
(
group_
);
return
rc
;
return
xjoin
(
group_
);
}
int
zmq
::
socket_base_t
::
leave
(
const
char
*
group_
)
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
int
rc
=
xleave
(
group_
);
return
rc
;
return
xleave
(
group_
);
}
void
zmq
::
socket_base_t
::
add_signaler
(
signaler_t
*
s_
)
...
...
@@ -589,7 +583,8 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
return
0
;
}
...
...
@@ -610,7 +605,8 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
return
0
;
}
...
...
@@ -630,7 +626,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
addr_
,
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
addr_
,
static_cast
<
own_t
*>
(
listener
)
,
NULL
);
options
.
connected
=
true
;
return
0
;
}
...
...
@@ -649,7 +645,8 @@ int zmq::socket_base_t::bind (const char *addr_)
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
return
0
;
}
...
...
@@ -687,7 +684,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
endpoint_t
peer
=
find_endpoint
(
addr_
);
const
endpoint_t
peer
=
find_endpoint
(
addr_
);
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
...
...
@@ -706,7 +703,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t
*
parents
[
2
]
=
{
this
,
peer
.
socket
==
NULL
?
this
:
peer
.
socket
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
bool
conflate
=
const
bool
conflate
=
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
...
...
@@ -733,7 +730,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
...
...
@@ -748,7 +745,7 @@ int zmq::socket_base_t::connect (const char *addr_)
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
}
...
...
@@ -761,7 +758,7 @@ int zmq::socket_base_t::connect (const char *addr_)
memcpy
(
id
.
data
(),
peer
.
options
.
routing_id
,
peer
.
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
1
]
->
flush
();
}
...
...
@@ -935,9 +932,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?)
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
||
protocol
==
protocol_name
::
udp
;
const
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
||
protocol
==
protocol_name
::
udp
;
pipe_t
*
newpipe
=
NULL
;
if
(
options
.
immediate
!=
1
||
subscribe_to_all
)
{
...
...
@@ -945,7 +942,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t
*
parents
[
2
]
=
{
this
,
session
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
bool
conflate
=
const
bool
conflate
=
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
...
...
@@ -968,7 +965,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI
paddr
->
to_string
(
_last_endpoint
);
add_endpoint
(
addr_
,
(
own_t
*
)
session
,
newpipe
);
add_endpoint
(
addr_
,
static_cast
<
own_t
*>
(
session
)
,
newpipe
);
return
0
;
}
...
...
@@ -1019,7 +1016,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
if
(
unregister_endpoint
(
addr_str
,
this
)
==
0
)
{
return
0
;
}
std
::
pair
<
inprocs_t
::
iterator
,
inprocs_t
::
iterator
>
range
=
const
std
::
pair
<
inprocs_t
::
iterator
,
inprocs_t
::
iterator
>
range
=
_inprocs
.
equal_range
(
addr_str
);
if
(
range
.
first
==
range
.
second
)
{
errno
=
ENOENT
;
...
...
@@ -1126,7 +1123,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
int
timeout
=
options
.
sndtimeo
;
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
const
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
...
...
@@ -1218,7 +1215,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
int
timeout
=
options
.
rcvtimeo
;
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
const
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
...
...
@@ -1715,7 +1712,7 @@ void zmq::socket_base_t::event (const std::string &addr_,
// Send a monitor event
void
zmq
::
socket_base_t
::
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
)
const
std
::
string
&
addr_
)
const
{
// this is a private method which is only called from
// contexts where the mutex has been locked before
...
...
@@ -1805,7 +1802,7 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
return
res
;
}
bool
zmq
::
routing_socket_base_t
::
connect_routing_id_is_set
()
bool
zmq
::
routing_socket_base_t
::
connect_routing_id_is_set
()
const
{
return
!
_connect_routing_id
.
empty
();
}
...
...
@@ -1838,7 +1835,7 @@ const zmq::routing_socket_base_t::out_pipe_t *
zmq
::
routing_socket_base_t
::
lookup_out_pipe
(
const
blob_t
&
routing_id_
)
const
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id_
);
const
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id_
);
return
it
==
_out_pipes
.
end
()
?
NULL
:
&
it
->
second
;
}
...
...
src/socket_base.hpp
View file @
bbae67df
...
...
@@ -63,7 +63,7 @@ class socket_base_t : public own_t,
public
:
// Returns false if object is not a socket.
bool
check_tag
();
bool
check_tag
()
const
;
// Returns whether the socket is thread-safe.
bool
is_thread_safe
()
const
;
...
...
@@ -73,7 +73,7 @@ class socket_base_t : public own_t,
create
(
int
type_
,
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
);
// Returns the mailbox associated with this socket.
i_mailbox
*
get_mailbox
();
i_mailbox
*
get_mailbox
()
const
;
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
...
...
@@ -190,7 +190,8 @@ class socket_base_t : public own_t,
void
event
(
const
std
::
string
&
addr_
,
intptr_t
value_
,
int
type_
);
// Socket event data dispatch
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
);
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
)
const
;
// Monitor socket cleanup
void
stop_monitor
(
bool
send_monitor_stopped_event_
=
true
);
...
...
@@ -227,12 +228,12 @@ class socket_base_t : public own_t,
bool
_destroyed
;
// Parse URI string.
int
static
int
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
address_
);
// Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
int
check_protocol
(
const
std
::
string
&
protocol_
);
int
check_protocol
(
const
std
::
string
&
protocol_
)
const
;
// Register the pipe with this socket.
void
attach_pipe
(
zmq
::
pipe_t
*
pipe_
,
...
...
@@ -314,7 +315,7 @@ class routing_socket_base_t : public socket_base_t
// own methods
std
::
string
extract_connect_routing_id
();
bool
connect_routing_id_is_set
();
bool
connect_routing_id_is_set
()
const
;
struct
out_pipe_t
{
...
...
src/socks.cpp
View file @
bbae67df
...
...
@@ -33,6 +33,7 @@
#include "err.hpp"
#include "socks.hpp"
#include "tcp.hpp"
#include "blob.hpp"
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/socket.h>
...
...
@@ -132,7 +133,7 @@ zmq::socks_request_t::socks_request_t (uint8_t command_,
std
::
string
hostname_
,
uint16_t
port_
)
:
command
(
command_
),
hostname
(
hostname_
),
hostname
(
ZMQ_MOVE
(
hostname_
)
),
port
(
port_
)
{
zmq_assert
(
hostname_
.
size
()
<=
UINT8_MAX
);
...
...
src/tcp.cpp
View file @
bbae67df
...
...
@@ -236,7 +236,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
return
nbytes
;
#else
ssize_t
nbytes
=
send
(
s_
,
(
char
*
)
data_
,
size_
,
0
);
ssize_t
nbytes
=
send
(
s_
,
static_cast
<
const
char
*>
(
data_
)
,
size_
,
0
);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
...
...
src/tcp_address.cpp
View file @
bbae67df
...
...
@@ -308,7 +308,7 @@ bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss_,
mask
=
_address_mask
;
const
size_t
full_bytes
=
mask
/
8
;
if
(
memcmp
(
our_bytes
,
their_bytes
,
full_bytes
))
if
(
memcmp
(
our_bytes
,
their_bytes
,
full_bytes
)
!=
0
)
return
false
;
const
uint8_t
last_byte_bits
=
0xffU
<<
(
8
-
mask
%
8
);
...
...
src/trie.cpp
View file @
bbae67df
...
...
@@ -62,7 +62,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
return
_refcnt
==
1
;
}
unsigned
char
c
=
*
prefix_
;
const
unsigned
char
c
=
*
prefix_
;
if
(
c
<
_min
||
c
>=
_min
+
_count
)
{
// The character is out of range of currently handled
// characters. We have to extend the table.
...
...
@@ -71,7 +71,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
_count
=
1
;
_next
.
node
=
NULL
;
}
else
if
(
_count
==
1
)
{
unsigned
char
oldc
=
_min
;
const
unsigned
char
oldc
=
_min
;
trie_t
*
oldp
=
_next
.
node
;
_count
=
(
_min
<
c
?
c
-
_min
:
_min
-
c
)
+
1
;
_next
.
table
=
...
...
@@ -83,7 +83,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
_next
.
table
[
oldc
-
_min
]
=
oldp
;
}
else
if
(
_min
<
c
)
{
// The new character is above the current character range.
unsigned
short
old_count
=
_count
;
const
unsigned
short
old_count
=
_count
;
_count
=
c
-
_min
+
1
;
_next
.
table
=
static_cast
<
trie_t
**>
(
realloc
(
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
...
...
@@ -92,10 +92,10 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
_next
.
table
[
i
]
=
NULL
;
}
else
{
// The new character is below the current character range.
unsigned
short
old_count
=
_count
;
const
unsigned
short
old_count
=
_count
;
_count
=
(
_min
+
old_count
)
-
c
;
_next
.
table
=
static_cast
<
trie_t
**>
(
realloc
(
(
void
*
)
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
realloc
(
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
zmq_assert
(
_next
.
table
);
memmove
(
_next
.
table
+
_min
-
c
,
_next
.
table
,
old_count
*
sizeof
(
trie_t
*
));
...
...
@@ -133,7 +133,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
_refcnt
--
;
return
_refcnt
==
0
;
}
unsigned
char
c
=
*
prefix_
;
const
unsigned
char
c
=
*
prefix_
;
if
(
!
_count
||
c
<
_min
||
c
>=
_min
+
_count
)
return
false
;
...
...
@@ -142,7 +142,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
if
(
!
next_node
)
return
false
;
bool
ret
=
next_node
->
rm
(
prefix_
+
1
,
size_
-
1
);
const
bool
ret
=
next_node
->
rm
(
prefix_
+
1
,
size_
-
1
);
// Prune redundant nodes
if
(
next_node
->
is_redundant
())
{
...
...
@@ -252,7 +252,7 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
// If there's no corresponding slot for the first character
// of the prefix, the message does not match.
unsigned
char
c
=
*
data_
;
const
unsigned
char
c
=
*
data_
;
if
(
c
<
current
->
_min
||
c
>=
current
->
_min
+
current
->
_count
)
return
false
;
...
...
@@ -283,7 +283,7 @@ void zmq::trie_t::apply_helper (unsigned char **buff_,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
)
void
*
arg_
)
const
{
// If this node is a subscription, apply the function.
if
(
_refcnt
)
...
...
src/trie.hpp
View file @
bbae67df
...
...
@@ -64,7 +64,7 @@ class trie_t
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
);
void
*
arg_
)
const
;
bool
is_redundant
()
const
;
uint32_t
_refcnt
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment