Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0b1589db
Unverified
Commit
0b1589db
authored
Aug 10, 2018
by
Luca Boccassi
Committed by
GitHub
Aug 10, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3209 from sigiesec/code-improvements
Code style improvements
parents
6e8424ab
bbae67df
Show whitespace changes
Inline
Side-by-side
Showing
29 changed files
with
152 additions
and
159 deletions
+152
-159
atomic_counter.hpp
src/atomic_counter.hpp
+1
-8
atomic_ptr.hpp
src/atomic_ptr.hpp
+2
-8
blob.hpp
src/blob.hpp
+4
-4
dealer.cpp
src/dealer.cpp
+3
-3
err.cpp
src/err.cpp
+2
-1
ip.cpp
src/ip.cpp
+1
-1
ipc_address.cpp
src/ipc_address.cpp
+2
-2
ipc_address.hpp
src/ipc_address.hpp
+1
-1
ipc_listener.cpp
src/ipc_listener.cpp
+12
-14
ipc_listener.hpp
src/ipc_listener.hpp
+1
-1
macros.hpp
src/macros.hpp
+8
-0
null_mechanism.cpp
src/null_mechanism.cpp
+4
-7
pipe.cpp
src/pipe.cpp
+17
-18
pipe.hpp
src/pipe.hpp
+6
-6
plain_client.cpp
src/plain_client.cpp
+5
-3
proxy.cpp
src/proxy.cpp
+14
-17
random.cpp
src/random.cpp
+2
-0
raw_decoder.cpp
src/raw_decoder.cpp
+3
-3
raw_decoder.hpp
src/raw_decoder.hpp
+1
-1
router.cpp
src/router.cpp
+6
-3
router.hpp
src/router.hpp
+2
-2
signaler.cpp
src/signaler.cpp
+5
-5
socket_base.cpp
src/socket_base.cpp
+28
-31
socket_base.hpp
src/socket_base.hpp
+7
-6
socks.cpp
src/socks.cpp
+2
-1
tcp.cpp
src/tcp.cpp
+1
-1
tcp_address.cpp
src/tcp_address.cpp
+1
-1
trie.cpp
src/trie.cpp
+10
-10
trie.hpp
src/trie.hpp
+1
-1
No files found.
src/atomic_counter.hpp
View file @
0b1589db
...
@@ -31,6 +31,7 @@
...
@@ -31,6 +31,7 @@
#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__
#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__
#include "stdint.hpp"
#include "stdint.hpp"
#include "macros.hpp"
#if defined ZMQ_FORCE_MUTEXES
#if defined ZMQ_FORCE_MUTEXES
#define ZMQ_ATOMIC_COUNTER_MUTEX
#define ZMQ_ATOMIC_COUNTER_MUTEX
...
@@ -66,14 +67,6 @@
...
@@ -66,14 +67,6 @@
#include <arch/atomic.h>
#include <arch/atomic.h>
#endif
#endif
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
namespace
zmq
namespace
zmq
{
{
// This class represents an integer that can be incremented/decremented
// This class represents an integer that can be incremented/decremented
...
...
src/atomic_ptr.hpp
View file @
0b1589db
...
@@ -30,6 +30,8 @@
...
@@ -30,6 +30,8 @@
#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__
#include "macros.hpp"
#if defined ZMQ_FORCE_MUTEXES
#if defined ZMQ_FORCE_MUTEXES
#define ZMQ_ATOMIC_PTR_MUTEX
#define ZMQ_ATOMIC_PTR_MUTEX
#elif defined ZMQ_HAVE_ATOMIC_INTRINSICS
#elif defined ZMQ_HAVE_ATOMIC_INTRINSICS
...
@@ -64,14 +66,6 @@
...
@@ -64,14 +66,6 @@
#include <arch/atomic.h>
#include <arch/atomic.h>
#endif
#endif
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
namespace
zmq
namespace
zmq
{
{
#if !defined ZMQ_ATOMIC_PTR_CXX11
#if !defined ZMQ_ATOMIC_PTR_CXX11
...
...
src/blob.hpp
View file @
0b1589db
...
@@ -30,6 +30,7 @@
...
@@ -30,6 +30,7 @@
#ifndef __ZMQ_BLOB_HPP_INCLUDED__
#ifndef __ZMQ_BLOB_HPP_INCLUDED__
#define __ZMQ_BLOB_HPP_INCLUDED__
#define __ZMQ_BLOB_HPP_INCLUDED__
#include "macros.hpp"
#include "err.hpp"
#include "err.hpp"
#include <stdlib.h>
#include <stdlib.h>
...
@@ -116,7 +117,7 @@ struct blob_t
...
@@ -116,7 +117,7 @@ struct blob_t
// Defines an order relationship on blob_t.
// Defines an order relationship on blob_t.
bool
operator
<
(
blob_t
const
&
other_
)
const
bool
operator
<
(
blob_t
const
&
other_
)
const
{
{
int
cmpres
=
const
int
cmpres
=
memcmp
(
_data
,
other_
.
_data
,
std
::
min
(
_size
,
other_
.
_size
));
memcmp
(
_data
,
other_
.
_data
,
std
::
min
(
_size
,
other_
.
_size
));
return
cmpres
<
0
||
(
cmpres
==
0
&&
_size
<
other_
.
_size
);
return
cmpres
<
0
||
(
cmpres
==
0
&&
_size
<
other_
.
_size
);
}
}
...
@@ -164,14 +165,13 @@ struct blob_t
...
@@ -164,14 +165,13 @@ struct blob_t
blob_t
(
const
blob_t
&
)
=
delete
;
blob_t
(
const
blob_t
&
)
=
delete
;
blob_t
&
operator
=
(
const
blob_t
&
)
=
delete
;
blob_t
&
operator
=
(
const
blob_t
&
)
=
delete
;
blob_t
(
blob_t
&&
other_
)
:
blob_t
(
blob_t
&&
other_
)
ZMQ_NOEXCEPT
:
_data
(
other_
.
_data
),
_data
(
other_
.
_data
),
_size
(
other_
.
_size
),
_size
(
other_
.
_size
),
_owned
(
other_
.
_owned
)
_owned
(
other_
.
_owned
)
{
{
other_
.
_owned
=
false
;
other_
.
_owned
=
false
;
}
}
blob_t
&
operator
=
(
blob_t
&&
other_
)
blob_t
&
operator
=
(
blob_t
&&
other_
)
ZMQ_NOEXCEPT
{
{
if
(
this
!=
&
other_
)
{
if
(
this
!=
&
other_
)
{
clear
();
clear
();
...
...
src/dealer.cpp
View file @
0b1589db
...
@@ -46,10 +46,10 @@ zmq::dealer_t::~dealer_t ()
...
@@ -46,10 +46,10 @@ zmq::dealer_t::~dealer_t ()
void
zmq
::
dealer_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
void
zmq
::
dealer_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
bool
subscribe_to_all_
,
bool
subscribe_to_all_
,
bool
locally_initated_
)
bool
locally_init
i
ated_
)
{
{
LIBZMQ_UNUSED
(
subscribe_to_all_
);
LIBZMQ_UNUSED
(
subscribe_to_all_
);
LIBZMQ_UNUSED
(
locally_initated_
);
LIBZMQ_UNUSED
(
locally_init
i
ated_
);
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
...
@@ -60,7 +60,7 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_,
...
@@ -60,7 +60,7 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_,
rc
=
pipe_
->
write
(
&
probe_msg
);
rc
=
pipe_
->
write
(
&
probe_msg
);
// zmq_assert (rc) is not applicable here, since it is not a bug.
// zmq_assert (rc) is not applicable here, since it is not a bug.
(
void
)
rc
;
LIBZMQ_UNUSED
(
rc
)
;
pipe_
->
flush
();
pipe_
->
flush
();
...
...
src/err.cpp
View file @
0b1589db
...
@@ -29,6 +29,7 @@
...
@@ -29,6 +29,7 @@
#include "precompiled.hpp"
#include "precompiled.hpp"
#include "err.hpp"
#include "err.hpp"
#include "macros.hpp"
const
char
*
zmq
::
errno_to_string
(
int
errno_
)
const
char
*
zmq
::
errno_to_string
(
int
errno_
)
{
{
...
@@ -82,7 +83,7 @@ void zmq::zmq_abort (const char *errmsg_)
...
@@ -82,7 +83,7 @@ void zmq::zmq_abort (const char *errmsg_)
extra_info
[
0
]
=
(
ULONG_PTR
)
errmsg_
;
extra_info
[
0
]
=
(
ULONG_PTR
)
errmsg_
;
RaiseException
(
0x40000015
,
EXCEPTION_NONCONTINUABLE
,
1
,
extra_info
);
RaiseException
(
0x40000015
,
EXCEPTION_NONCONTINUABLE
,
1
,
extra_info
);
#else
#else
(
void
)
errmsg_
;
LIBZMQ_UNUSED
(
errmsg_
)
;
print_backtrace
();
print_backtrace
();
abort
();
abort
();
#endif
#endif
...
...
src/ip.cpp
View file @
0b1589db
...
@@ -125,7 +125,7 @@ void zmq::unblock_socket (fd_t s_)
...
@@ -125,7 +125,7 @@ void zmq::unblock_socket (fd_t s_)
void
zmq
::
enable_ipv4_mapping
(
fd_t
s_
)
void
zmq
::
enable_ipv4_mapping
(
fd_t
s_
)
{
{
(
void
)
s_
;
LIBZMQ_UNUSED
(
s_
)
;
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
...
...
src/ipc_address.cpp
View file @
0b1589db
...
@@ -75,7 +75,7 @@ int zmq::ipc_address_t::resolve (const char *path_)
...
@@ -75,7 +75,7 @@ int zmq::ipc_address_t::resolve (const char *path_)
return
0
;
return
0
;
}
}
int
zmq
::
ipc_address_t
::
to_string
(
std
::
string
&
addr_
)
int
zmq
::
ipc_address_t
::
to_string
(
std
::
string
&
addr_
)
const
{
{
if
(
address
.
sun_family
!=
AF_UNIX
)
{
if
(
address
.
sun_family
!=
AF_UNIX
)
{
addr_
.
clear
();
addr_
.
clear
();
...
@@ -94,7 +94,7 @@ int zmq::ipc_address_t::to_string (std::string &addr_)
...
@@ -94,7 +94,7 @@ int zmq::ipc_address_t::to_string (std::string &addr_)
const
sockaddr
*
zmq
::
ipc_address_t
::
addr
()
const
const
sockaddr
*
zmq
::
ipc_address_t
::
addr
()
const
{
{
return
(
sockaddr
*
)
&
address
;
return
reinterpret_cast
<
const
sockaddr
*>
(
&
address
)
;
}
}
socklen_t
zmq
::
ipc_address_t
::
addrlen
()
const
socklen_t
zmq
::
ipc_address_t
::
addrlen
()
const
...
...
src/ipc_address.hpp
View file @
0b1589db
...
@@ -51,7 +51,7 @@ class ipc_address_t
...
@@ -51,7 +51,7 @@ class ipc_address_t
int
resolve
(
const
char
*
path_
);
int
resolve
(
const
char
*
path_
);
// The opposite to resolve()
// The opposite to resolve()
int
to_string
(
std
::
string
&
addr_
);
int
to_string
(
std
::
string
&
addr_
)
const
;
const
sockaddr
*
addr
()
const
;
const
sockaddr
*
addr
()
const
;
socklen_t
addrlen
()
const
;
socklen_t
addrlen
()
const
;
...
...
src/ipc_listener.cpp
View file @
0b1589db
...
@@ -115,11 +115,9 @@ int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
...
@@ -115,11 +115,9 @@ int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
}
}
path_
.
assign
(
&
buffer
[
0
]);
path_
.
assign
(
&
buffer
[
0
]);
file_
.
assign
(
path_
+
"/socket"
)
;
file_
=
path_
+
"/socket"
;
#else
#else
// Silence -Wunused-parameter. #pragma and __attribute__((unused)) are not
LIBZMQ_UNUSED
(
path_
);
// very portable unfortunately...
(
void
)
path_
;
int
fd
=
mkstemp
(
&
buffer
[
0
]);
int
fd
=
mkstemp
(
&
buffer
[
0
]);
if
(
fd
==
-
1
)
if
(
fd
==
-
1
)
return
-
1
;
return
-
1
;
...
@@ -138,7 +136,7 @@ zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
...
@@ -138,7 +136,7 @@ zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
io_object_t
(
io_thread_
),
io_object_t
(
io_thread_
),
has_file
(
false
),
has_file
(
false
),
s
(
retired_fd
),
s
(
retired_fd
),
handle
(
(
handle_t
)
NULL
),
handle
(
static_cast
<
handle_t
>
(
NULL
)
),
socket
(
socket_
)
socket
(
socket_
)
{
{
}
}
...
@@ -239,10 +237,10 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
...
@@ -239,10 +237,10 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
if
(
!
tmp_socket_dirname
.
empty
())
{
if
(
!
tmp_socket_dirname
.
empty
())
{
// We need to preserve errno to return to the user
// We need to preserve errno to return to the user
int
errno_
=
errno
;
int
tmp_errno
=
errno
;
::
rmdir
(
tmp_socket_dirname
.
c_str
());
::
rmdir
(
tmp_socket_dirname
.
c_str
());
tmp_socket_dirname
.
clear
();
tmp_socket_dirname
.
clear
();
errno
=
errno_
;
errno
=
tmp_errno
;
}
}
return
-
1
;
return
-
1
;
}
}
...
@@ -257,10 +255,10 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
...
@@ -257,10 +255,10 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if
(
s
==
-
1
)
{
if
(
s
==
-
1
)
{
if
(
!
tmp_socket_dirname
.
empty
())
{
if
(
!
tmp_socket_dirname
.
empty
())
{
// We need to preserve errno to return to the user
// We need to preserve errno to return to the user
int
errno_
=
errno
;
int
tmp_errno
=
errno
;
::
rmdir
(
tmp_socket_dirname
.
c_str
());
::
rmdir
(
tmp_socket_dirname
.
c_str
());
tmp_socket_dirname
.
clear
();
tmp_socket_dirname
.
clear
();
errno
=
errno_
;
errno
=
tmp_errno
;
}
}
return
-
1
;
return
-
1
;
}
}
...
@@ -277,7 +275,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
...
@@ -277,7 +275,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
goto
error
;
goto
error
;
}
}
filename
.
assign
(
addr
.
c_str
()
);
filename
=
ZMQ_MOVE
(
addr
);
has_file
=
true
;
has_file
=
true
;
socket
->
event_listening
(
endpoint
,
s
);
socket
->
event_listening
(
endpoint
,
s
);
...
@@ -319,7 +317,7 @@ int zmq::ipc_listener_t::close ()
...
@@ -319,7 +317,7 @@ int zmq::ipc_listener_t::close ()
#if defined ZMQ_HAVE_SO_PEERCRED
#if defined ZMQ_HAVE_SO_PEERCRED
bool
zmq
::
ipc_listener_t
::
filter
(
fd_t
sock
)
bool
zmq
::
ipc_listener_t
::
filter
(
fd_t
sock
_
)
{
{
if
(
options
.
ipc_uid_accept_filters
.
empty
()
if
(
options
.
ipc_uid_accept_filters
.
empty
()
&&
options
.
ipc_pid_accept_filters
.
empty
()
&&
options
.
ipc_pid_accept_filters
.
empty
()
...
@@ -329,7 +327,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
...
@@ -329,7 +327,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
struct
ucred
cred
;
struct
ucred
cred
;
socklen_t
size
=
sizeof
(
cred
);
socklen_t
size
=
sizeof
(
cred
);
if
(
getsockopt
(
sock
,
SOL_SOCKET
,
SO_PEERCRED
,
&
cred
,
&
size
))
if
(
getsockopt
(
sock
_
,
SOL_SOCKET
,
SO_PEERCRED
,
&
cred
,
&
size
))
return
false
;
return
false
;
if
(
options
.
ipc_uid_accept_filters
.
find
(
cred
.
uid
)
if
(
options
.
ipc_uid_accept_filters
.
find
(
cred
.
uid
)
!=
options
.
ipc_uid_accept_filters
.
end
()
!=
options
.
ipc_uid_accept_filters
.
end
()
...
@@ -359,7 +357,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
...
@@ -359,7 +357,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
bool
zmq
::
ipc_listener_t
::
filter
(
fd_t
sock
)
bool
zmq
::
ipc_listener_t
::
filter
(
fd_t
sock
_
)
{
{
if
(
options
.
ipc_uid_accept_filters
.
empty
()
if
(
options
.
ipc_uid_accept_filters
.
empty
()
&&
options
.
ipc_gid_accept_filters
.
empty
())
&&
options
.
ipc_gid_accept_filters
.
empty
())
...
@@ -368,7 +366,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
...
@@ -368,7 +366,7 @@ bool zmq::ipc_listener_t::filter (fd_t sock)
struct
xucred
cred
;
struct
xucred
cred
;
socklen_t
size
=
sizeof
(
cred
);
socklen_t
size
=
sizeof
(
cred
);
if
(
getsockopt
(
sock
,
0
,
LOCAL_PEERCRED
,
&
cred
,
&
size
))
if
(
getsockopt
(
sock
_
,
0
,
LOCAL_PEERCRED
,
&
cred
,
&
size
))
return
false
;
return
false
;
if
(
cred
.
cr_version
!=
XUCRED_VERSION
)
if
(
cred
.
cr_version
!=
XUCRED_VERSION
)
return
false
;
return
false
;
...
...
src/ipc_listener.hpp
View file @
0b1589db
...
@@ -76,7 +76,7 @@ class ipc_listener_t : public own_t, public io_object_t
...
@@ -76,7 +76,7 @@ class ipc_listener_t : public own_t, public io_object_t
// Filter new connections if the OS provides a mechanism to get
// Filter new connections if the OS provides a mechanism to get
// the credentials of the peer process. Called from accept().
// the credentials of the peer process. Called from accept().
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
bool
filter
(
fd_t
sock
);
bool
filter
(
fd_t
sock
_
);
#endif
#endif
// Accept the new connection. Returns the file descriptor of the
// Accept the new connection. Returns the file descriptor of the
...
...
src/macros.hpp
View file @
0b1589db
...
@@ -11,3 +11,11 @@
...
@@ -11,3 +11,11 @@
}
}
/******************************************************************************/
/******************************************************************************/
#if !defined ZMQ_NOEXCEPT
#if defined ZMQ_HAVE_NOEXCEPT
#define ZMQ_NOEXCEPT noexcept
#else
#define ZMQ_NOEXCEPT
#endif
#endif
src/null_mechanism.cpp
View file @
0b1589db
...
@@ -215,16 +215,13 @@ int zmq::null_mechanism_t::zap_msg_available ()
...
@@ -215,16 +215,13 @@ int zmq::null_mechanism_t::zap_msg_available ()
zmq
::
mechanism_t
::
status_t
zmq
::
null_mechanism_t
::
status
()
const
zmq
::
mechanism_t
::
status_t
zmq
::
null_mechanism_t
::
status
()
const
{
{
if
(
_ready_command_sent
&&
_ready_command_received
)
return
ready
;
const
bool
command_sent
=
_ready_command_sent
||
_error_command_sent
;
const
bool
command_sent
=
_ready_command_sent
||
_error_command_sent
;
const
bool
command_received
=
const
bool
command_received
=
_ready_command_received
||
_error_command_received
;
_ready_command_received
||
_error_command_received
;
return
command_sent
&&
command_received
?
error
:
handshaking
;
if
(
_ready_command_sent
&&
_ready_command_received
)
return
mechanism_t
::
ready
;
if
(
command_sent
&&
command_received
)
return
error
;
else
return
handshaking
;
}
}
void
zmq
::
null_mechanism_t
::
send_zap_request
()
void
zmq
::
null_mechanism_t
::
send_zap_request
()
...
...
src/pipe.cpp
View file @
0b1589db
...
@@ -127,7 +127,7 @@ void zmq::pipe_t::set_server_socket_routing_id (
...
@@ -127,7 +127,7 @@ void zmq::pipe_t::set_server_socket_routing_id (
_server_socket_routing_id
=
server_socket_routing_id_
;
_server_socket_routing_id
=
server_socket_routing_id_
;
}
}
uint32_t
zmq
::
pipe_t
::
get_server_socket_routing_id
()
uint32_t
zmq
::
pipe_t
::
get_server_socket_routing_id
()
const
{
{
return
_server_socket_routing_id
;
return
_server_socket_routing_id
;
}
}
...
@@ -138,7 +138,7 @@ void zmq::pipe_t::set_router_socket_routing_id (
...
@@ -138,7 +138,7 @@ void zmq::pipe_t::set_router_socket_routing_id (
_router_socket_routing_id
.
set_deep_copy
(
router_socket_routing_id_
);
_router_socket_routing_id
.
set_deep_copy
(
router_socket_routing_id_
);
}
}
const
zmq
::
blob_t
&
zmq
::
pipe_t
::
get_routing_id
()
const
zmq
::
blob_t
&
zmq
::
pipe_t
::
get_routing_id
()
const
{
{
return
_router_socket_routing_id
;
return
_router_socket_routing_id
;
}
}
...
@@ -165,7 +165,7 @@ bool zmq::pipe_t::check_read ()
...
@@ -165,7 +165,7 @@ bool zmq::pipe_t::check_read ()
// initiate termination process.
// initiate termination process.
if
(
_in_pipe
->
probe
(
is_delimiter
))
{
if
(
_in_pipe
->
probe
(
is_delimiter
))
{
msg_t
msg
;
msg_t
msg
;
bool
ok
=
_in_pipe
->
read
(
&
msg
);
const
bool
ok
=
_in_pipe
->
read
(
&
msg
);
zmq_assert
(
ok
);
zmq_assert
(
ok
);
process_delimiter
();
process_delimiter
();
return
false
;
return
false
;
...
@@ -218,7 +218,7 @@ bool zmq::pipe_t::check_write ()
...
@@ -218,7 +218,7 @@ bool zmq::pipe_t::check_write ()
if
(
unlikely
(
!
_out_active
||
_state
!=
active
))
if
(
unlikely
(
!
_out_active
||
_state
!=
active
))
return
false
;
return
false
;
bool
full
=
!
check_hwm
();
const
bool
full
=
!
check_hwm
();
if
(
unlikely
(
full
))
{
if
(
unlikely
(
full
))
{
_out_active
=
false
;
_out_active
=
false
;
...
@@ -233,7 +233,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
...
@@ -233,7 +233,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if
(
unlikely
(
!
check_write
()))
if
(
unlikely
(
!
check_write
()))
return
false
;
return
false
;
bool
more
=
(
msg_
->
flags
()
&
msg_t
::
more
)
!=
0
;
const
bool
more
=
(
msg_
->
flags
()
&
msg_t
::
more
)
!=
0
;
const
bool
is_routing_id
=
msg_
->
is_routing_id
();
const
bool
is_routing_id
=
msg_
->
is_routing_id
();
_out_pipe
->
write
(
*
msg_
,
more
);
_out_pipe
->
write
(
*
msg_
,
more
);
if
(
!
more
&&
!
is_routing_id
)
if
(
!
more
&&
!
is_routing_id
)
...
@@ -242,14 +242,14 @@ bool zmq::pipe_t::write (msg_t *msg_)
...
@@ -242,14 +242,14 @@ bool zmq::pipe_t::write (msg_t *msg_)
return
true
;
return
true
;
}
}
void
zmq
::
pipe_t
::
rollback
()
void
zmq
::
pipe_t
::
rollback
()
const
{
{
// Remove incomplete message from the outbound pipe.
// Remove incomplete message from the outbound pipe.
msg_t
msg
;
msg_t
msg
;
if
(
_out_pipe
)
{
if
(
_out_pipe
)
{
while
(
_out_pipe
->
unwrite
(
&
msg
))
{
while
(
_out_pipe
->
unwrite
(
&
msg
))
{
zmq_assert
(
msg
.
flags
()
&
msg_t
::
more
);
zmq_assert
(
msg
.
flags
()
&
msg_t
::
more
);
int
rc
=
msg
.
close
();
const
int
rc
=
msg
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
}
}
...
@@ -294,7 +294,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
...
@@ -294,7 +294,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
while
(
_out_pipe
->
read
(
&
msg
))
{
while
(
_out_pipe
->
read
(
&
msg
))
{
if
(
!
(
msg
.
flags
()
&
msg_t
::
more
))
if
(
!
(
msg
.
flags
()
&
msg_t
::
more
))
_msgs_written
--
;
_msgs_written
--
;
int
rc
=
msg
.
close
();
const
int
rc
=
msg
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
LIBZMQ_DELETE
(
_out_pipe
);
LIBZMQ_DELETE
(
_out_pipe
);
...
@@ -372,7 +372,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
...
@@ -372,7 +372,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
if
(
!
_conflate
)
{
if
(
!
_conflate
)
{
msg_t
msg
;
msg_t
msg
;
while
(
_in_pipe
->
read
(
&
msg
))
{
while
(
_in_pipe
->
read
(
&
msg
))
{
int
rc
=
msg
.
close
();
const
int
rc
=
msg
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
}
}
...
@@ -409,7 +409,7 @@ void zmq::pipe_t::terminate (bool delay_)
...
@@ -409,7 +409,7 @@ void zmq::pipe_t::terminate (bool delay_)
}
}
// The simple sync termination case. Ask the peer to terminate and wait
// The simple sync termination case. Ask the peer to terminate and wait
// for the ack.
// for the ack.
else
if
(
_state
==
active
)
{
if
(
_state
==
active
)
{
send_pipe_term
(
_peer
);
send_pipe_term
(
_peer
);
_state
=
term_req_sent1
;
_state
=
term_req_sent1
;
}
}
...
@@ -476,7 +476,7 @@ int zmq::pipe_t::compute_lwm (int hwm_)
...
@@ -476,7 +476,7 @@ int zmq::pipe_t::compute_lwm (int hwm_)
// Given the 3. it would be good to keep HWM and LWM as far apart as
// Given the 3. it would be good to keep HWM and LWM as far apart as
// possible to reduce the thread switching overhead to almost zero.
// possible to reduce the thread switching overhead to almost zero.
// Let's make LWM 1/2 of HWM.
// Let's make LWM 1/2 of HWM.
int
result
=
(
hwm_
+
1
)
/
2
;
const
int
result
=
(
hwm_
+
1
)
/
2
;
return
result
;
return
result
;
}
}
...
@@ -502,20 +502,18 @@ void zmq::pipe_t::hiccup ()
...
@@ -502,20 +502,18 @@ void zmq::pipe_t::hiccup ()
// We'll drop the pointer to the inpipe. From now on, the peer is
// We'll drop the pointer to the inpipe. From now on, the peer is
// responsible for deallocating it.
// responsible for deallocating it.
_in_pipe
=
NULL
;
// Create new inpipe.
// Create new inpipe.
if
(
_conflate
)
_in_pipe
=
new
(
std
::
nothrow
)
ypipe_conflate_t
<
msg_t
>
();
else
_in_pipe
=
_in_pipe
=
new
(
std
::
nothrow
)
ypipe_t
<
msg_t
,
message_pipe_granularity
>
();
_conflate
?
static_cast
<
upipe_t
*>
(
new
(
std
::
nothrow
)
ypipe_conflate_t
<
msg_t
>
())
:
new
(
std
::
nothrow
)
ypipe_t
<
msg_t
,
message_pipe_granularity
>
();
alloc_assert
(
_in_pipe
);
alloc_assert
(
_in_pipe
);
_in_active
=
true
;
_in_active
=
true
;
// Notify the peer about the hiccup.
// Notify the peer about the hiccup.
send_hiccup
(
_peer
,
(
void
*
)
_in_pipe
);
send_hiccup
(
_peer
,
_in_pipe
);
}
}
void
zmq
::
pipe_t
::
set_hwms
(
int
inhwm_
,
int
outhwm_
)
void
zmq
::
pipe_t
::
set_hwms
(
int
inhwm_
,
int
outhwm_
)
...
@@ -542,7 +540,8 @@ void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
...
@@ -542,7 +540,8 @@ void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
bool
zmq
::
pipe_t
::
check_hwm
()
const
bool
zmq
::
pipe_t
::
check_hwm
()
const
{
{
bool
full
=
_hwm
>
0
&&
_msgs_written
-
_peers_msgs_read
>=
uint64_t
(
_hwm
);
const
bool
full
=
_hwm
>
0
&&
_msgs_written
-
_peers_msgs_read
>=
uint64_t
(
_hwm
);
return
(
!
full
);
return
(
!
full
);
}
}
...
...
src/pipe.hpp
View file @
0b1589db
...
@@ -85,12 +85,12 @@ class pipe_t : public object_t,
...
@@ -85,12 +85,12 @@ class pipe_t : public object_t,
void
set_event_sink
(
i_pipe_events
*
sink_
);
void
set_event_sink
(
i_pipe_events
*
sink_
);
// Pipe endpoint can store an routing ID to be used by its clients.
// Pipe endpoint can store an routing ID to be used by its clients.
void
set_server_socket_routing_id
(
uint32_t
routing_id_
);
void
set_server_socket_routing_id
(
uint32_t
server_socket_
routing_id_
);
uint32_t
get_server_socket_routing_id
();
uint32_t
get_server_socket_routing_id
()
const
;
// Pipe endpoint can store an opaque ID to be used by its clients.
// Pipe endpoint can store an opaque ID to be used by its clients.
void
set_router_socket_routing_id
(
const
blob_t
&
identity
_
);
void
set_router_socket_routing_id
(
const
blob_t
&
router_socket_routing_id
_
);
const
blob_t
&
get_routing_id
();
const
blob_t
&
get_routing_id
()
const
;
const
blob_t
&
get_credential
()
const
;
const
blob_t
&
get_credential
()
const
;
...
@@ -111,7 +111,7 @@ class pipe_t : public object_t,
...
@@ -111,7 +111,7 @@ class pipe_t : public object_t,
bool
write
(
msg_t
*
msg_
);
bool
write
(
msg_t
*
msg_
);
// Remove unfinished parts of the outbound message from the pipe.
// Remove unfinished parts of the outbound message from the pipe.
void
rollback
();
void
rollback
()
const
;
// Flush the messages downstream.
// Flush the messages downstream.
void
flush
();
void
flush
();
...
@@ -168,7 +168,7 @@ class pipe_t : public object_t,
...
@@ -168,7 +168,7 @@ class pipe_t : public object_t,
// Pipepair uses this function to let us know about
// Pipepair uses this function to let us know about
// the peer pipe object.
// the peer pipe object.
void
set_peer
(
pipe_t
*
p
ipe
_
);
void
set_peer
(
pipe_t
*
p
eer
_
);
// Destructor is private. Pipe objects destroy themselves.
// Destructor is private. Pipe objects destroy themselves.
~
pipe_t
();
~
pipe_t
();
...
...
src/plain_client.cpp
View file @
0b1589db
...
@@ -105,12 +105,14 @@ int zmq::plain_client_t::process_handshake_command (msg_t *msg_)
...
@@ -105,12 +105,14 @@ int zmq::plain_client_t::process_handshake_command (msg_t *msg_)
zmq
::
mechanism_t
::
status_t
zmq
::
plain_client_t
::
status
()
const
zmq
::
mechanism_t
::
status_t
zmq
::
plain_client_t
::
status
()
const
{
{
if
(
_state
==
ready
)
switch
(
_state
)
{
case
ready
:
return
mechanism_t
::
ready
;
return
mechanism_t
::
ready
;
if
(
_state
==
error_command_received
)
case
error_command_received
:
return
mechanism_t
::
error
;
return
mechanism_t
::
error
;
else
default
:
return
mechanism_t
::
handshaking
;
return
mechanism_t
::
handshaking
;
}
}
}
void
zmq
::
plain_client_t
::
produce_hello
(
msg_t
*
msg_
)
const
void
zmq
::
plain_client_t
::
produce_hello
(
msg_t
*
msg_
)
const
...
...
src/proxy.cpp
View file @
0b1589db
...
@@ -90,7 +90,7 @@ typedef struct
...
@@ -90,7 +90,7 @@ typedef struct
// Utility functions
// Utility functions
int
capture
(
class
zmq
::
socket_base_t
*
capture_
,
int
capture
(
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
,
zmq
::
msg_t
*
msg_
,
int
more_
=
0
)
int
more_
=
0
)
{
{
// Copy message to capture socket if any
// Copy message to capture socket if any
...
@@ -99,7 +99,7 @@ int capture (class zmq::socket_base_t *capture_,
...
@@ -99,7 +99,7 @@ int capture (class zmq::socket_base_t *capture_,
int
rc
=
ctrl
.
init
();
int
rc
=
ctrl
.
init
();
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
rc
=
ctrl
.
copy
(
msg_
);
rc
=
ctrl
.
copy
(
*
msg_
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
rc
=
capture_
->
send
(
&
ctrl
,
more_
?
ZMQ_SNDMORE
:
0
);
rc
=
capture_
->
send
(
&
ctrl
,
more_
?
ZMQ_SNDMORE
:
0
);
...
@@ -114,17 +114,17 @@ int forward (class zmq::socket_base_t *from_,
...
@@ -114,17 +114,17 @@ int forward (class zmq::socket_base_t *from_,
class
zmq
::
socket_base_t
*
to_
,
class
zmq
::
socket_base_t
*
to_
,
zmq_socket_stats_t
*
to_stats_
,
zmq_socket_stats_t
*
to_stats_
,
class
zmq
::
socket_base_t
*
capture_
,
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
)
zmq
::
msg_t
*
msg_
)
{
{
int
more
;
int
more
;
size_t
moresz
;
size_t
moresz
;
size_t
complete_msg_size
=
0
;
size_t
complete_msg_size
=
0
;
while
(
true
)
{
while
(
true
)
{
int
rc
=
from_
->
recv
(
&
msg_
,
0
);
int
rc
=
from_
->
recv
(
msg_
,
0
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
complete_msg_size
+=
msg_
.
size
();
complete_msg_size
+=
msg_
->
size
();
moresz
=
sizeof
more
;
moresz
=
sizeof
more
;
rc
=
from_
->
getsockopt
(
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
rc
=
from_
->
getsockopt
(
ZMQ_RCVMORE
,
&
more
,
&
moresz
);
...
@@ -136,7 +136,7 @@ int forward (class zmq::socket_base_t *from_,
...
@@ -136,7 +136,7 @@ int forward (class zmq::socket_base_t *from_,
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
rc
=
to_
->
send
(
&
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
rc
=
to_
->
send
(
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
...
@@ -163,7 +163,7 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
...
@@ -163,7 +163,7 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
// VSM of 8 bytes can't fail to init
// VSM of 8 bytes can't fail to init
msg
.
init_size
(
sizeof
(
uint64_t
));
msg
.
init_size
(
sizeof
(
uint64_t
));
memcpy
(
msg
.
data
(),
(
const
void
*
)
&
stat_
,
sizeof
(
uint64_t
));
memcpy
(
msg
.
data
(),
&
stat_
,
sizeof
(
uint64_t
));
// if the first message is handed to the pipe successfully then the HWM
// if the first message is handed to the pipe successfully then the HWM
// is not full, which means failures are due to interrupts (on Windows pipes
// is not full, which means failures are due to interrupts (on Windows pipes
...
@@ -366,11 +366,8 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -366,11 +366,8 @@ int zmq::proxy (class socket_base_t *frontend_,
}
}
}
}
int
i
;
bool
request_processed
,
reply_processed
;
bool
request_processed
,
reply_processed
;
while
(
state
!=
terminated
)
{
while
(
state
!=
terminated
)
{
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
...
@@ -387,7 +384,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -387,7 +384,7 @@ int zmq::proxy (class socket_base_t *frontend_,
CHECK_RC_EXIT_ON_FAILURE
();
CHECK_RC_EXIT_ON_FAILURE
();
// Process events.
// Process events.
for
(
i
=
0
;
i
<
rc
;
i
++
)
{
for
(
i
nt
i
=
0
;
i
<
rc
;
i
++
)
{
if
(
events
[
i
].
socket
==
frontend_
)
{
if
(
events
[
i
].
socket
==
frontend_
)
{
frontend_in
=
(
events
[
i
].
events
&
ZMQ_POLLIN
)
!=
0
;
frontend_in
=
(
events
[
i
].
events
&
ZMQ_POLLIN
)
!=
0
;
frontend_out
=
(
events
[
i
].
events
&
ZMQ_POLLOUT
)
!=
0
;
frontend_out
=
(
events
[
i
].
events
&
ZMQ_POLLOUT
)
!=
0
;
...
@@ -413,7 +410,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -413,7 +410,7 @@ int zmq::proxy (class socket_base_t *frontend_,
}
}
// Copy message to capture socket if any.
// Copy message to capture socket if any.
rc
=
capture
(
capture_
,
msg
);
rc
=
capture
(
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
CHECK_RC_EXIT_ON_FAILURE
();
if
(
msg
.
size
()
==
5
&&
memcmp
(
msg
.
data
(),
"PAUSE"
,
5
)
==
0
)
{
if
(
msg
.
size
()
==
5
&&
memcmp
(
msg
.
data
(),
"PAUSE"
,
5
)
==
0
)
{
...
@@ -452,7 +449,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -452,7 +449,7 @@ int zmq::proxy (class socket_base_t *frontend_,
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if
(
frontend_in
&&
(
backend_out
||
frontend_equal_to_backend
))
{
if
(
frontend_in
&&
(
backend_out
||
frontend_equal_to_backend
))
{
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
&
backend_stats
,
capture_
,
msg
);
&
backend_stats
,
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
CHECK_RC_EXIT_ON_FAILURE
();
request_processed
=
true
;
request_processed
=
true
;
frontend_in
=
backend_out
=
false
;
frontend_in
=
backend_out
=
false
;
...
@@ -465,7 +462,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -465,7 +462,7 @@ int zmq::proxy (class socket_base_t *frontend_,
// design in 'for' event processing loop.
// design in 'for' event processing loop.
if
(
backend_in
&&
frontend_out
)
{
if
(
backend_in
&&
frontend_out
)
{
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
&
frontend_stats
,
capture_
,
msg
);
&
frontend_stats
,
capture_
,
&
msg
);
CHECK_RC_EXIT_ON_FAILURE
();
CHECK_RC_EXIT_ON_FAILURE
();
reply_processed
=
true
;
reply_processed
=
true
;
backend_in
=
frontend_out
=
false
;
backend_in
=
frontend_out
=
false
;
...
@@ -595,7 +592,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -595,7 +592,7 @@ int zmq::proxy (class socket_base_t *frontend_,
return
close_and_return
(
&
msg
,
-
1
);
return
close_and_return
(
&
msg
,
-
1
);
// Copy message to capture socket if any
// Copy message to capture socket if any
rc
=
capture
(
capture_
,
msg
);
rc
=
capture
(
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
return
close_and_return
(
&
msg
,
-
1
);
...
@@ -628,7 +625,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -628,7 +625,7 @@ int zmq::proxy (class socket_base_t *frontend_,
if
(
state
==
active
&&
items
[
0
].
revents
&
ZMQ_POLLIN
if
(
state
==
active
&&
items
[
0
].
revents
&
ZMQ_POLLIN
&&
(
frontend_
==
backend_
||
itemsout
[
1
].
revents
&
ZMQ_POLLOUT
))
{
&&
(
frontend_
==
backend_
||
itemsout
[
1
].
revents
&
ZMQ_POLLOUT
))
{
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
&
backend_stats
,
rc
=
forward
(
frontend_
,
&
frontend_stats
,
backend_
,
&
backend_stats
,
capture_
,
msg
);
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
return
close_and_return
(
&
msg
,
-
1
);
}
}
...
@@ -637,7 +634,7 @@ int zmq::proxy (class socket_base_t *frontend_,
...
@@ -637,7 +634,7 @@ int zmq::proxy (class socket_base_t *frontend_,
&&
items
[
1
].
revents
&
ZMQ_POLLIN
&&
items
[
1
].
revents
&
ZMQ_POLLIN
&&
itemsout
[
0
].
revents
&
ZMQ_POLLOUT
)
{
&&
itemsout
[
0
].
revents
&
ZMQ_POLLOUT
)
{
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
&
frontend_stats
,
rc
=
forward
(
backend_
,
&
backend_stats
,
frontend_
,
&
frontend_stats
,
capture_
,
msg
);
capture_
,
&
msg
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
close_and_return
(
&
msg
,
-
1
);
return
close_and_return
(
&
msg
,
-
1
);
}
}
...
...
src/random.cpp
View file @
0b1589db
...
@@ -154,6 +154,8 @@ static void manage_random (bool init_)
...
@@ -154,6 +154,8 @@ static void manage_random (bool init_)
}
else
{
}
else
{
randombytes_close
();
randombytes_close
();
}
}
#else
LIBZMQ_UNUSED
(
init_
);
#endif
#endif
}
}
...
...
src/raw_decoder.cpp
View file @
0b1589db
...
@@ -36,13 +36,13 @@
...
@@ -36,13 +36,13 @@
zmq
::
raw_decoder_t
::
raw_decoder_t
(
size_t
bufsize_
)
:
_allocator
(
bufsize_
,
1
)
zmq
::
raw_decoder_t
::
raw_decoder_t
(
size_t
bufsize_
)
:
_allocator
(
bufsize_
,
1
)
{
{
int
rc
=
_in_progress
.
init
();
const
int
rc
=
_in_progress
.
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
zmq
::
raw_decoder_t
::~
raw_decoder_t
()
zmq
::
raw_decoder_t
::~
raw_decoder_t
()
{
{
int
rc
=
_in_progress
.
close
();
const
int
rc
=
_in_progress
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
...
@@ -56,7 +56,7 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_,
...
@@ -56,7 +56,7 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_,
size_t
size_
,
size_t
size_
,
size_t
&
bytes_used_
)
size_t
&
bytes_used_
)
{
{
int
rc
=
const
int
rc
=
_in_progress
.
init
(
const_cast
<
unsigned
char
*>
(
data_
),
size_
,
_in_progress
.
init
(
const_cast
<
unsigned
char
*>
(
data_
),
size_
,
shared_message_memory_allocator
::
call_dec_ref
,
shared_message_memory_allocator
::
call_dec_ref
,
_allocator
.
buffer
(),
_allocator
.
provide_content
());
_allocator
.
buffer
(),
_allocator
.
provide_content
());
...
...
src/raw_decoder.hpp
View file @
0b1589db
...
@@ -50,7 +50,7 @@ class raw_decoder_t : public i_decoder
...
@@ -50,7 +50,7 @@ class raw_decoder_t : public i_decoder
virtual
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
);
virtual
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
);
virtual
int
virtual
int
decode
(
const
unsigned
char
*
data_
,
size_t
size_
,
size_t
&
proces
sed_
);
decode
(
const
unsigned
char
*
data_
,
size_t
size_
,
size_t
&
bytes_u
sed_
);
virtual
msg_t
*
msg
()
{
return
&
_in_progress
;
}
virtual
msg_t
*
msg
()
{
return
&
_in_progress
;
}
...
...
src/router.cpp
View file @
0b1589db
...
@@ -397,9 +397,9 @@ bool zmq::router_t::xhas_in ()
...
@@ -397,9 +397,9 @@ bool zmq::router_t::xhas_in ()
return
true
;
return
true
;
}
}
static
bool
check_pipe_hwm
(
const
zmq
::
pipe_t
&
pipe
)
static
bool
check_pipe_hwm
(
const
zmq
::
pipe_t
&
pipe
_
)
{
{
return
pipe
.
check_hwm
();
return
pipe
_
.
check_hwm
();
}
}
bool
zmq
::
router_t
::
xhas_out
()
bool
zmq
::
router_t
::
xhas_out
()
...
@@ -424,7 +424,10 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
...
@@ -424,7 +424,10 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
{
{
int
res
=
0
;
int
res
=
0
;
blob_t
routing_id_blob
((
unsigned
char
*
)
routing_id_
,
routing_id_size_
);
// TODO remove the const_cast, see comment in lookup_out_pipe
const
blob_t
routing_id_blob
(
static_cast
<
unsigned
char
*>
(
const_cast
<
void
*>
(
routing_id_
)),
routing_id_size_
);
const
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
routing_id_blob
);
const
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
routing_id_blob
);
if
(
!
out_pipe
)
{
if
(
!
out_pipe
)
{
errno
=
EHOSTUNREACH
;
errno
=
EHOSTUNREACH
;
...
...
src/router.hpp
View file @
0b1589db
...
@@ -62,7 +62,7 @@ class router_t : public routing_socket_base_t
...
@@ -62,7 +62,7 @@ class router_t : public routing_socket_base_t
bool
xhas_out
();
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
int
get_peer_state
(
const
void
*
identity_
,
size_t
identity
_size_
)
const
;
int
get_peer_state
(
const
void
*
routing_id_
,
size_t
routing_id
_size_
)
const
;
protected
:
protected
:
// Rollback any message parts that were sent but not yet flushed.
// Rollback any message parts that were sent but not yet flushed.
...
@@ -71,7 +71,7 @@ class router_t : public routing_socket_base_t
...
@@ -71,7 +71,7 @@ class router_t : public routing_socket_base_t
private
:
private
:
// Receive peer id and update lookup map
// Receive peer id and update lookup map
bool
identify_peer
(
pipe_t
*
pipe_
,
bool
locally_initiated
);
bool
identify_peer
(
pipe_t
*
pipe_
,
bool
locally_initiated
_
);
// Fair queueing object for inbound pipes.
// Fair queueing object for inbound pipes.
fq_t
_fq
;
fq_t
_fq
;
...
...
src/signaler.cpp
View file @
0b1589db
...
@@ -242,7 +242,7 @@ int zmq::signaler_t::wait (int timeout_)
...
@@ -242,7 +242,7 @@ int zmq::signaler_t::wait (int timeout_)
struct
pollfd
pfd
;
struct
pollfd
pfd
;
pfd
.
fd
=
_r
;
pfd
.
fd
=
_r
;
pfd
.
events
=
POLLIN
;
pfd
.
events
=
POLLIN
;
int
rc
=
poll
(
&
pfd
,
1
,
timeout_
);
const
int
rc
=
poll
(
&
pfd
,
1
,
timeout_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
errno_assert
(
errno
==
EINTR
);
errno_assert
(
errno
==
EINTR
);
return
-
1
;
return
-
1
;
...
@@ -319,7 +319,7 @@ void zmq::signaler_t::recv ()
...
@@ -319,7 +319,7 @@ void zmq::signaler_t::recv ()
#else
#else
unsigned
char
dummy
;
unsigned
char
dummy
;
#if defined ZMQ_HAVE_WINDOWS
#if defined ZMQ_HAVE_WINDOWS
int
nbytes
=
const
int
nbytes
=
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
wsa_assert
(
nbytes
!=
SOCKET_ERROR
);
wsa_assert
(
nbytes
!=
SOCKET_ERROR
);
#elif defined ZMQ_HAVE_VXWORKS
#elif defined ZMQ_HAVE_VXWORKS
...
@@ -343,7 +343,7 @@ int zmq::signaler_t::recv_failable ()
...
@@ -343,7 +343,7 @@ int zmq::signaler_t::recv_failable ()
if
(
sz
==
-
1
)
{
if
(
sz
==
-
1
)
{
errno_assert
(
errno
==
EAGAIN
);
errno_assert
(
errno
==
EAGAIN
);
return
-
1
;
return
-
1
;
}
else
{
}
errno_assert
(
sz
==
sizeof
(
dummy
));
errno_assert
(
sz
==
sizeof
(
dummy
));
// If we accidentally grabbed the next signal(s) along with the current
// If we accidentally grabbed the next signal(s) along with the current
...
@@ -356,11 +356,11 @@ int zmq::signaler_t::recv_failable ()
...
@@ -356,11 +356,11 @@ int zmq::signaler_t::recv_failable ()
}
}
zmq_assert
(
dummy
==
1
);
zmq_assert
(
dummy
==
1
);
}
#else
#else
unsigned
char
dummy
;
unsigned
char
dummy
;
#if defined ZMQ_HAVE_WINDOWS
#if defined ZMQ_HAVE_WINDOWS
int
nbytes
=
const
int
nbytes
=
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
::
recv
(
_r
,
reinterpret_cast
<
char
*>
(
&
dummy
),
sizeof
(
dummy
),
0
);
if
(
nbytes
==
SOCKET_ERROR
)
{
if
(
nbytes
==
SOCKET_ERROR
)
{
const
int
last_error
=
WSAGetLastError
();
const
int
last_error
=
WSAGetLastError
();
...
...
src/socket_base.cpp
View file @
0b1589db
...
@@ -97,7 +97,7 @@
...
@@ -97,7 +97,7 @@
#include "scatter.hpp"
#include "scatter.hpp"
#include "dgram.hpp"
#include "dgram.hpp"
bool
zmq
::
socket_base_t
::
check_tag
()
bool
zmq
::
socket_base_t
::
check_tag
()
const
{
{
return
_tag
==
0xbaddecaf
;
return
_tag
==
0xbaddecaf
;
}
}
...
@@ -253,7 +253,7 @@ zmq::socket_base_t::~socket_base_t ()
...
@@ -253,7 +253,7 @@ zmq::socket_base_t::~socket_base_t ()
zmq_assert
(
_destroyed
);
zmq_assert
(
_destroyed
);
}
}
zmq
::
i_mailbox
*
zmq
::
socket_base_t
::
get_mailbox
()
zmq
::
i_mailbox
*
zmq
::
socket_base_t
::
get_mailbox
()
const
{
{
return
_mailbox
;
return
_mailbox
;
}
}
...
@@ -274,7 +274,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
...
@@ -274,7 +274,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
zmq_assert
(
uri_
!=
NULL
);
zmq_assert
(
uri_
!=
NULL
);
std
::
string
uri
(
uri_
);
std
::
string
uri
(
uri_
);
std
::
string
::
size_type
pos
=
uri
.
find
(
"://"
);
const
std
::
string
::
size_type
pos
=
uri
.
find
(
"://"
);
if
(
pos
==
std
::
string
::
npos
)
{
if
(
pos
==
std
::
string
::
npos
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
...
@@ -289,7 +289,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
...
@@ -289,7 +289,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
return
0
;
return
0
;
}
}
int
zmq
::
socket_base_t
::
check_protocol
(
const
std
::
string
&
protocol_
)
int
zmq
::
socket_base_t
::
check_protocol
(
const
std
::
string
&
protocol_
)
const
{
{
// First check out whether the protocol is something we are aware of.
// First check out whether the protocol is something we are aware of.
if
(
protocol_
!=
"inproc"
if
(
protocol_
!=
"inproc"
...
@@ -413,7 +413,7 @@ int zmq::socket_base_t::getsockopt (int option_,
...
@@ -413,7 +413,7 @@ int zmq::socket_base_t::getsockopt (int option_,
}
}
if
(
option_
==
ZMQ_EVENTS
)
{
if
(
option_
==
ZMQ_EVENTS
)
{
int
rc
=
process_commands
(
0
,
false
);
const
int
rc
=
process_commands
(
0
,
false
);
if
(
rc
!=
0
&&
(
errno
==
EINTR
||
errno
==
ETERM
))
{
if
(
rc
!=
0
&&
(
errno
==
EINTR
||
errno
==
ETERM
))
{
return
-
1
;
return
-
1
;
}
}
...
@@ -439,20 +439,14 @@ int zmq::socket_base_t::join (const char *group_)
...
@@ -439,20 +439,14 @@ int zmq::socket_base_t::join (const char *group_)
{
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
int
rc
=
xjoin
(
group_
);
return
xjoin
(
group_
);
return
rc
;
}
}
int
zmq
::
socket_base_t
::
leave
(
const
char
*
group_
)
int
zmq
::
socket_base_t
::
leave
(
const
char
*
group_
)
{
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
int
rc
=
xleave
(
group_
);
return
xleave
(
group_
);
return
rc
;
}
}
void
zmq
::
socket_base_t
::
add_signaler
(
signaler_t
*
s_
)
void
zmq
::
socket_base_t
::
add_signaler
(
signaler_t
*
s_
)
...
@@ -589,7 +583,8 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -589,7 +583,8 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
options
.
connected
=
true
;
return
0
;
return
0
;
}
}
...
@@ -610,7 +605,8 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -610,7 +605,8 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
options
.
connected
=
true
;
return
0
;
return
0
;
}
}
...
@@ -630,7 +626,7 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -630,7 +626,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
addr_
,
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
addr_
,
static_cast
<
own_t
*>
(
listener
)
,
NULL
);
options
.
connected
=
true
;
options
.
connected
=
true
;
return
0
;
return
0
;
}
}
...
@@ -649,7 +645,8 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -649,7 +645,8 @@ int zmq::socket_base_t::bind (const char *addr_)
listener
->
get_address
(
_last_endpoint
);
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
_last_endpoint
.
c_str
(),
(
own_t
*
)
listener
,
NULL
);
add_endpoint
(
_last_endpoint
.
c_str
(),
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
options
.
connected
=
true
;
return
0
;
return
0
;
}
}
...
@@ -687,7 +684,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -687,7 +684,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// is in place we should follow generic pipe creation algorithm.
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
// Find the peer endpoint.
endpoint_t
peer
=
find_endpoint
(
addr_
);
const
endpoint_t
peer
=
find_endpoint
(
addr_
);
// The total HWM for an inproc connection should be the sum of
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
// the binder's HWM and the connector's HWM.
...
@@ -706,7 +703,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -706,7 +703,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t
*
parents
[
2
]
=
{
this
,
peer
.
socket
==
NULL
?
this
:
peer
.
socket
};
object_t
*
parents
[
2
]
=
{
this
,
peer
.
socket
==
NULL
?
this
:
peer
.
socket
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
bool
conflate
=
const
bool
conflate
=
options
.
conflate
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
...
@@ -733,7 +730,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -733,7 +730,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
new_pipes
[
0
]
->
flush
();
...
@@ -748,7 +745,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -748,7 +745,7 @@ int zmq::socket_base_t::connect (const char *addr_)
memcpy
(
id
.
data
(),
options
.
routing_id
,
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
new_pipes
[
0
]
->
flush
();
}
}
...
@@ -761,7 +758,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -761,7 +758,7 @@ int zmq::socket_base_t::connect (const char *addr_)
memcpy
(
id
.
data
(),
peer
.
options
.
routing_id
,
memcpy
(
id
.
data
(),
peer
.
options
.
routing_id
,
peer
.
options
.
routing_id_size
);
peer
.
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
const
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
1
]
->
flush
();
new_pipes
[
1
]
->
flush
();
}
}
...
@@ -935,7 +932,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -935,7 +932,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM does not support subscription forwarding; ask for all data to be
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?)
// sent to this pipe. (same for NORM, currently?)
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
const
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
||
protocol
==
"norm"
||
protocol
==
protocol_name
::
udp
;
||
protocol
==
protocol_name
::
udp
;
pipe_t
*
newpipe
=
NULL
;
pipe_t
*
newpipe
=
NULL
;
...
@@ -945,7 +942,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -945,7 +942,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t
*
parents
[
2
]
=
{
this
,
session
};
object_t
*
parents
[
2
]
=
{
this
,
session
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
bool
conflate
=
const
bool
conflate
=
options
.
conflate
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
...
@@ -968,7 +965,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -968,7 +965,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI
// Save last endpoint URI
paddr
->
to_string
(
_last_endpoint
);
paddr
->
to_string
(
_last_endpoint
);
add_endpoint
(
addr_
,
(
own_t
*
)
session
,
newpipe
);
add_endpoint
(
addr_
,
static_cast
<
own_t
*>
(
session
)
,
newpipe
);
return
0
;
return
0
;
}
}
...
@@ -1019,7 +1016,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
...
@@ -1019,7 +1016,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
if
(
unregister_endpoint
(
addr_str
,
this
)
==
0
)
{
if
(
unregister_endpoint
(
addr_str
,
this
)
==
0
)
{
return
0
;
return
0
;
}
}
std
::
pair
<
inprocs_t
::
iterator
,
inprocs_t
::
iterator
>
range
=
const
std
::
pair
<
inprocs_t
::
iterator
,
inprocs_t
::
iterator
>
range
=
_inprocs
.
equal_range
(
addr_str
);
_inprocs
.
equal_range
(
addr_str
);
if
(
range
.
first
==
range
.
second
)
{
if
(
range
.
first
==
range
.
second
)
{
errno
=
ENOENT
;
errno
=
ENOENT
;
...
@@ -1126,7 +1123,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
...
@@ -1126,7 +1123,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
// If the timeout is infinite, don't care.
int
timeout
=
options
.
sndtimeo
;
int
timeout
=
options
.
sndtimeo
;
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
const
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
// Oops, we couldn't send the message. Wait for the next
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
// command, process it and try to send the message again.
...
@@ -1218,7 +1215,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
...
@@ -1218,7 +1215,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
// If the timeout is infinite, don't care.
int
timeout
=
options
.
rcvtimeo
;
int
timeout
=
options
.
rcvtimeo
;
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
const
uint64_t
end
=
timeout
<
0
?
0
:
(
_clock
.
now_ms
()
+
timeout
);
// In blocking scenario, commands are processed over and over again until
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
// we are able to fetch a message.
...
@@ -1715,7 +1712,7 @@ void zmq::socket_base_t::event (const std::string &addr_,
...
@@ -1715,7 +1712,7 @@ void zmq::socket_base_t::event (const std::string &addr_,
// Send a monitor event
// Send a monitor event
void
zmq
::
socket_base_t
::
monitor_event
(
int
event_
,
void
zmq
::
socket_base_t
::
monitor_event
(
int
event_
,
intptr_t
value_
,
intptr_t
value_
,
const
std
::
string
&
addr_
)
const
std
::
string
&
addr_
)
const
{
{
// this is a private method which is only called from
// this is a private method which is only called from
// contexts where the mutex has been locked before
// contexts where the mutex has been locked before
...
@@ -1805,7 +1802,7 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
...
@@ -1805,7 +1802,7 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
return
res
;
return
res
;
}
}
bool
zmq
::
routing_socket_base_t
::
connect_routing_id_is_set
()
bool
zmq
::
routing_socket_base_t
::
connect_routing_id_is_set
()
const
{
{
return
!
_connect_routing_id
.
empty
();
return
!
_connect_routing_id
.
empty
();
}
}
...
@@ -1838,7 +1835,7 @@ const zmq::routing_socket_base_t::out_pipe_t *
...
@@ -1838,7 +1835,7 @@ const zmq::routing_socket_base_t::out_pipe_t *
zmq
::
routing_socket_base_t
::
lookup_out_pipe
(
const
blob_t
&
routing_id_
)
const
zmq
::
routing_socket_base_t
::
lookup_out_pipe
(
const
blob_t
&
routing_id_
)
const
{
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id_
);
const
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id_
);
return
it
==
_out_pipes
.
end
()
?
NULL
:
&
it
->
second
;
return
it
==
_out_pipes
.
end
()
?
NULL
:
&
it
->
second
;
}
}
...
...
src/socket_base.hpp
View file @
0b1589db
...
@@ -63,7 +63,7 @@ class socket_base_t : public own_t,
...
@@ -63,7 +63,7 @@ class socket_base_t : public own_t,
public
:
public
:
// Returns false if object is not a socket.
// Returns false if object is not a socket.
bool
check_tag
();
bool
check_tag
()
const
;
// Returns whether the socket is thread-safe.
// Returns whether the socket is thread-safe.
bool
is_thread_safe
()
const
;
bool
is_thread_safe
()
const
;
...
@@ -73,7 +73,7 @@ class socket_base_t : public own_t,
...
@@ -73,7 +73,7 @@ class socket_base_t : public own_t,
create
(
int
type_
,
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
);
create
(
int
type_
,
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
);
// Returns the mailbox associated with this socket.
// Returns the mailbox associated with this socket.
i_mailbox
*
get_mailbox
();
i_mailbox
*
get_mailbox
()
const
;
// Interrupt blocking call if the socket is stuck in one.
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
// This function can be called from a different thread!
...
@@ -190,7 +190,8 @@ class socket_base_t : public own_t,
...
@@ -190,7 +190,8 @@ class socket_base_t : public own_t,
void
event
(
const
std
::
string
&
addr_
,
intptr_t
value_
,
int
type_
);
void
event
(
const
std
::
string
&
addr_
,
intptr_t
value_
,
int
type_
);
// Socket event data dispatch
// Socket event data dispatch
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
);
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
)
const
;
// Monitor socket cleanup
// Monitor socket cleanup
void
stop_monitor
(
bool
send_monitor_stopped_event_
=
true
);
void
stop_monitor
(
bool
send_monitor_stopped_event_
=
true
);
...
@@ -227,12 +228,12 @@ class socket_base_t : public own_t,
...
@@ -227,12 +228,12 @@ class socket_base_t : public own_t,
bool
_destroyed
;
bool
_destroyed
;
// Parse URI string.
// Parse URI string.
int
static
int
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
address_
);
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
address_
);
// Check whether transport protocol, as specified in connect or
// Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
// bind, is available and compatible with the socket type.
int
check_protocol
(
const
std
::
string
&
protocol_
);
int
check_protocol
(
const
std
::
string
&
protocol_
)
const
;
// Register the pipe with this socket.
// Register the pipe with this socket.
void
attach_pipe
(
zmq
::
pipe_t
*
pipe_
,
void
attach_pipe
(
zmq
::
pipe_t
*
pipe_
,
...
@@ -314,7 +315,7 @@ class routing_socket_base_t : public socket_base_t
...
@@ -314,7 +315,7 @@ class routing_socket_base_t : public socket_base_t
// own methods
// own methods
std
::
string
extract_connect_routing_id
();
std
::
string
extract_connect_routing_id
();
bool
connect_routing_id_is_set
();
bool
connect_routing_id_is_set
()
const
;
struct
out_pipe_t
struct
out_pipe_t
{
{
...
...
src/socks.cpp
View file @
0b1589db
...
@@ -33,6 +33,7 @@
...
@@ -33,6 +33,7 @@
#include "err.hpp"
#include "err.hpp"
#include "socks.hpp"
#include "socks.hpp"
#include "tcp.hpp"
#include "tcp.hpp"
#include "blob.hpp"
#ifndef ZMQ_HAVE_WINDOWS
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/socket.h>
#include <sys/socket.h>
...
@@ -132,7 +133,7 @@ zmq::socks_request_t::socks_request_t (uint8_t command_,
...
@@ -132,7 +133,7 @@ zmq::socks_request_t::socks_request_t (uint8_t command_,
std
::
string
hostname_
,
std
::
string
hostname_
,
uint16_t
port_
)
:
uint16_t
port_
)
:
command
(
command_
),
command
(
command_
),
hostname
(
hostname_
),
hostname
(
ZMQ_MOVE
(
hostname_
)
),
port
(
port_
)
port
(
port_
)
{
{
zmq_assert
(
hostname_
.
size
()
<=
UINT8_MAX
);
zmq_assert
(
hostname_
.
size
()
<=
UINT8_MAX
);
...
...
src/tcp.cpp
View file @
0b1589db
...
@@ -236,7 +236,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
...
@@ -236,7 +236,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
return
nbytes
;
return
nbytes
;
#else
#else
ssize_t
nbytes
=
send
(
s_
,
(
char
*
)
data_
,
size_
,
0
);
ssize_t
nbytes
=
send
(
s_
,
static_cast
<
const
char
*>
(
data_
)
,
size_
,
0
);
// Several errors are OK. When speculative write is being done we may not
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// be able to write a single byte from the socket. Also, SIGSTOP issued
...
...
src/tcp_address.cpp
View file @
0b1589db
...
@@ -308,7 +308,7 @@ bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss_,
...
@@ -308,7 +308,7 @@ bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss_,
mask
=
_address_mask
;
mask
=
_address_mask
;
const
size_t
full_bytes
=
mask
/
8
;
const
size_t
full_bytes
=
mask
/
8
;
if
(
memcmp
(
our_bytes
,
their_bytes
,
full_bytes
))
if
(
memcmp
(
our_bytes
,
their_bytes
,
full_bytes
)
!=
0
)
return
false
;
return
false
;
const
uint8_t
last_byte_bits
=
0xffU
<<
(
8
-
mask
%
8
);
const
uint8_t
last_byte_bits
=
0xffU
<<
(
8
-
mask
%
8
);
...
...
src/trie.cpp
View file @
0b1589db
...
@@ -62,7 +62,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
...
@@ -62,7 +62,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
return
_refcnt
==
1
;
return
_refcnt
==
1
;
}
}
unsigned
char
c
=
*
prefix_
;
const
unsigned
char
c
=
*
prefix_
;
if
(
c
<
_min
||
c
>=
_min
+
_count
)
{
if
(
c
<
_min
||
c
>=
_min
+
_count
)
{
// The character is out of range of currently handled
// The character is out of range of currently handled
// characters. We have to extend the table.
// characters. We have to extend the table.
...
@@ -71,7 +71,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
...
@@ -71,7 +71,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
_count
=
1
;
_count
=
1
;
_next
.
node
=
NULL
;
_next
.
node
=
NULL
;
}
else
if
(
_count
==
1
)
{
}
else
if
(
_count
==
1
)
{
unsigned
char
oldc
=
_min
;
const
unsigned
char
oldc
=
_min
;
trie_t
*
oldp
=
_next
.
node
;
trie_t
*
oldp
=
_next
.
node
;
_count
=
(
_min
<
c
?
c
-
_min
:
_min
-
c
)
+
1
;
_count
=
(
_min
<
c
?
c
-
_min
:
_min
-
c
)
+
1
;
_next
.
table
=
_next
.
table
=
...
@@ -83,19 +83,19 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
...
@@ -83,19 +83,19 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
_next
.
table
[
oldc
-
_min
]
=
oldp
;
_next
.
table
[
oldc
-
_min
]
=
oldp
;
}
else
if
(
_min
<
c
)
{
}
else
if
(
_min
<
c
)
{
// The new character is above the current character range.
// The new character is above the current character range.
unsigned
short
old_count
=
_count
;
const
unsigned
short
old_count
=
_count
;
_count
=
c
-
_min
+
1
;
_count
=
c
-
_min
+
1
;
_next
.
table
=
static_cast
<
trie_t
**>
(
_next
.
table
=
static_cast
<
trie_t
**>
(
realloc
(
(
void
*
)
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
realloc
(
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
zmq_assert
(
_next
.
table
);
zmq_assert
(
_next
.
table
);
for
(
unsigned
short
i
=
old_count
;
i
!=
_count
;
i
++
)
for
(
unsigned
short
i
=
old_count
;
i
!=
_count
;
i
++
)
_next
.
table
[
i
]
=
NULL
;
_next
.
table
[
i
]
=
NULL
;
}
else
{
}
else
{
// The new character is below the current character range.
// The new character is below the current character range.
unsigned
short
old_count
=
_count
;
const
unsigned
short
old_count
=
_count
;
_count
=
(
_min
+
old_count
)
-
c
;
_count
=
(
_min
+
old_count
)
-
c
;
_next
.
table
=
static_cast
<
trie_t
**>
(
_next
.
table
=
static_cast
<
trie_t
**>
(
realloc
(
(
void
*
)
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
realloc
(
_next
.
table
,
sizeof
(
trie_t
*
)
*
_count
));
zmq_assert
(
_next
.
table
);
zmq_assert
(
_next
.
table
);
memmove
(
_next
.
table
+
_min
-
c
,
_next
.
table
,
memmove
(
_next
.
table
+
_min
-
c
,
_next
.
table
,
old_count
*
sizeof
(
trie_t
*
));
old_count
*
sizeof
(
trie_t
*
));
...
@@ -133,7 +133,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
...
@@ -133,7 +133,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
_refcnt
--
;
_refcnt
--
;
return
_refcnt
==
0
;
return
_refcnt
==
0
;
}
}
unsigned
char
c
=
*
prefix_
;
const
unsigned
char
c
=
*
prefix_
;
if
(
!
_count
||
c
<
_min
||
c
>=
_min
+
_count
)
if
(
!
_count
||
c
<
_min
||
c
>=
_min
+
_count
)
return
false
;
return
false
;
...
@@ -142,7 +142,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
...
@@ -142,7 +142,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
if
(
!
next_node
)
if
(
!
next_node
)
return
false
;
return
false
;
bool
ret
=
next_node
->
rm
(
prefix_
+
1
,
size_
-
1
);
const
bool
ret
=
next_node
->
rm
(
prefix_
+
1
,
size_
-
1
);
// Prune redundant nodes
// Prune redundant nodes
if
(
next_node
->
is_redundant
())
{
if
(
next_node
->
is_redundant
())
{
...
@@ -252,7 +252,7 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
...
@@ -252,7 +252,7 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
// If there's no corresponding slot for the first character
// If there's no corresponding slot for the first character
// of the prefix, the message does not match.
// of the prefix, the message does not match.
unsigned
char
c
=
*
data_
;
const
unsigned
char
c
=
*
data_
;
if
(
c
<
current
->
_min
||
c
>=
current
->
_min
+
current
->
_count
)
if
(
c
<
current
->
_min
||
c
>=
current
->
_min
+
current
->
_count
)
return
false
;
return
false
;
...
@@ -283,7 +283,7 @@ void zmq::trie_t::apply_helper (unsigned char **buff_,
...
@@ -283,7 +283,7 @@ void zmq::trie_t::apply_helper (unsigned char **buff_,
void
(
*
func_
)
(
unsigned
char
*
data_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
),
void
*
arg_
)
void
*
arg_
)
const
{
{
// If this node is a subscription, apply the function.
// If this node is a subscription, apply the function.
if
(
_refcnt
)
if
(
_refcnt
)
...
...
src/trie.hpp
View file @
0b1589db
...
@@ -64,7 +64,7 @@ class trie_t
...
@@ -64,7 +64,7 @@ class trie_t
void
(
*
func_
)
(
unsigned
char
*
data_
,
void
(
*
func_
)
(
unsigned
char
*
data_
,
size_t
size_
,
size_t
size_
,
void
*
arg_
),
void
*
arg_
),
void
*
arg_
);
void
*
arg_
)
const
;
bool
is_redundant
()
const
;
bool
is_redundant
()
const
;
uint32_t
_refcnt
;
uint32_t
_refcnt
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment