Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ec98916e
Commit
ec98916e
authored
Aug 20, 2015
by
Constantin Rack
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1541 from jemc/master
Problem: Source files contain mixed tabs and spaces.
parents
6aa5c20b
61217a26
Hide whitespace changes
Inline
Side-by-side
Showing
22 changed files
with
415 additions
and
416 deletions
+415
-416
zmq.h
include/zmq.h
+4
-4
blob.hpp
src/blob.hpp
+4
-4
clock.cpp
src/clock.cpp
+10
-10
condition_variable.hpp
src/condition_variable.hpp
+28
-28
ctx.hpp
src/ctx.hpp
+3
-3
gssapi_mechanism_base.cpp
src/gssapi_mechanism_base.cpp
+1
-1
gssapi_server.cpp
src/gssapi_server.cpp
+1
-1
gssapi_server.hpp
src/gssapi_server.hpp
+2
-2
mtrie.cpp
src/mtrie.cpp
+7
-7
pgm_socket.cpp
src/pgm_socket.cpp
+53
-53
pipe.cpp
src/pipe.cpp
+5
-5
socket_base.cpp
src/socket_base.cpp
+25
-26
thread.cpp
src/thread.cpp
+3
-3
xpub.cpp
src/xpub.cpp
+87
-87
xpub.hpp
src/xpub.hpp
+6
-6
zmq.cpp
src/zmq.cpp
+21
-21
test_connect_rid.cpp
tests/test_connect_rid.cpp
+9
-9
test_srcfd.cpp
tests/test_srcfd.cpp
+10
-10
test_stream_disconnect.cpp
tests/test_stream_disconnect.cpp
+32
-32
test_thread_safe.cpp
tests/test_thread_safe.cpp
+67
-67
test_xpub_manual.cpp
tests/test_xpub_manual.cpp
+30
-30
test_xpub_welcome_msg.cpp
tests/test_xpub_welcome_msg.cpp
+7
-7
No files found.
include/zmq.h
View file @
ec98916e
...
@@ -397,7 +397,7 @@ ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
...
@@ -397,7 +397,7 @@ ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
typedef
struct
zmq_pollitem_t
typedef
struct
zmq_pollitem_t
{
{
void
*
socket
;
void
*
socket
;
#if defined _WIN32
#if defined _WIN32
SOCKET
fd
;
SOCKET
fd
;
#else
#else
...
@@ -426,7 +426,7 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
...
@@ -426,7 +426,7 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
#else
#else
ZMQ_EXPORT
int
zmq_pollfd_fd
(
void
*
p
);
ZMQ_EXPORT
int
zmq_pollfd_fd
(
void
*
p
);
#endif
#endif
/******************************************************************************/
/******************************************************************************/
/* Message proxying */
/* Message proxying */
/******************************************************************************/
/******************************************************************************/
...
@@ -517,8 +517,8 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
...
@@ -517,8 +517,8 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#define ZMQ_UNUSED(object) (void)object
#define ZMQ_UNUSED(object) (void)object
#define LIBZMQ_DELETE(p_object) {\
#define LIBZMQ_DELETE(p_object) {\
delete p_object; \
delete p_object; \
p_object = 0; \
p_object = 0; \
}
}
#undef ZMQ_EXPORT
#undef ZMQ_EXPORT
...
...
src/blob.hpp
View file @
ec98916e
...
@@ -45,10 +45,10 @@ namespace std
...
@@ -45,10 +45,10 @@ namespace std
{
{
typedef
unsigned
char
char_type
;
typedef
unsigned
char
char_type
;
// Unsigned as wint_t in unsigned.
// Unsigned as wint_t in unsigned.
typedef
unsigned
long
int_type
;
typedef
unsigned
long
int_type
;
typedef
streampos
pos_type
;
typedef
streampos
pos_type
;
typedef
streamoff
off_type
;
typedef
streamoff
off_type
;
typedef
mbstate_t
state_type
;
typedef
mbstate_t
state_type
;
static
void
static
void
assign
(
char_type
&
__c1
,
const
char_type
&
__c2
)
assign
(
char_type
&
__c1
,
const
char_type
&
__c2
)
...
...
src/clock.cpp
View file @
ec98916e
...
@@ -149,16 +149,16 @@ uint64_t zmq::clock_t::now_us ()
...
@@ -149,16 +149,16 @@ uint64_t zmq::clock_t::now_us ()
// Use POSIX clock_gettime function to get precise monotonic time.
// Use POSIX clock_gettime function to get precise monotonic time.
struct
timespec
tv
;
struct
timespec
tv
;
int
rc
=
clock_gettime
(
CLOCK_MONOTONIC
,
&
tv
);
int
rc
=
clock_gettime
(
CLOCK_MONOTONIC
,
&
tv
);
// Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
// Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
// This should be a configuration check, but I looked into it and writing an
// This should be a configuration check, but I looked into it and writing an
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
// Use POSIX gettimeofday function to get precise time.
// Use POSIX gettimeofday function to get precise time.
struct
timeval
tv
;
struct
timeval
tv
;
int
rc
=
gettimeofday
(
&
tv
,
NULL
);
int
rc
=
gettimeofday
(
&
tv
,
NULL
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
return
(
tv
.
tv_sec
*
(
uint64_t
)
1000000
+
tv
.
tv_usec
);
return
(
tv
.
tv_sec
*
(
uint64_t
)
1000000
+
tv
.
tv_usec
);
}
}
return
(
tv
.
tv_sec
*
(
uint64_t
)
1000000
+
tv
.
tv_nsec
/
1000
);
return
(
tv
.
tv_sec
*
(
uint64_t
)
1000000
+
tv
.
tv_nsec
/
1000
);
#elif defined HAVE_GETHRTIME
#elif defined HAVE_GETHRTIME
...
...
src/condition_variable.hpp
View file @
ec98916e
...
@@ -47,36 +47,36 @@
...
@@ -47,36 +47,36 @@
namespace
zmq
namespace
zmq
{
{
class
condition_variable_t
class
condition_variable_t
{
{
public
:
public
:
inline
condition_variable_t
()
inline
condition_variable_t
()
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
inline
~
condition_variable_t
()
inline
~
condition_variable_t
()
{
{
}
}
inline
int
wait
(
mutex_t
*
mutex_
,
int
timeout_
)
inline
int
wait
(
mutex_t
*
mutex_
,
int
timeout_
)
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
return
-
1
;
return
-
1
;
}
}
inline
void
broadcast
()
inline
void
broadcast
()
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
private
:
private
:
// Disable copy construction and assignment.
// Disable copy construction and assignment.
condition_variable_t
(
const
condition_variable_t
&
);
condition_variable_t
(
const
condition_variable_t
&
);
void
operator
=
(
const
condition_variable_t
&
);
void
operator
=
(
const
condition_variable_t
&
);
};
};
}
}
...
@@ -95,7 +95,7 @@ namespace zmq
...
@@ -95,7 +95,7 @@ namespace zmq
inline
~
condition_variable_t
()
inline
~
condition_variable_t
()
{
{
}
}
inline
int
wait
(
mutex_t
*
mutex_
,
int
timeout_
)
inline
int
wait
(
mutex_t
*
mutex_
,
int
timeout_
)
...
@@ -110,7 +110,7 @@ namespace zmq
...
@@ -110,7 +110,7 @@ namespace zmq
if
(
rc
!=
ERROR_TIMEOUT
)
if
(
rc
!=
ERROR_TIMEOUT
)
win_assert
(
rc
);
win_assert
(
rc
);
errno
=
EAGAIN
;
errno
=
EAGAIN
;
return
-
1
;
return
-
1
;
}
}
...
@@ -161,9 +161,9 @@ namespace zmq
...
@@ -161,9 +161,9 @@ namespace zmq
if
(
timeout_
!=
-
1
)
{
if
(
timeout_
!=
-
1
)
{
struct
timespec
timeout
;
struct
timespec
timeout
;
clock_gettime
(
CLOCK_REALTIME
,
&
timeout
);
clock_gettime
(
CLOCK_REALTIME
,
&
timeout
);
timeout
.
tv_sec
+=
timeout_
/
1000
;
timeout
.
tv_sec
+=
timeout_
/
1000
;
timeout
.
tv_nsec
+=
(
timeout_
%
1000
)
*
1000000
;
timeout
.
tv_nsec
+=
(
timeout_
%
1000
)
*
1000000
;
rc
=
pthread_cond_timedwait
(
&
cond
,
mutex_
->
get_mutex
(),
&
timeout
);
rc
=
pthread_cond_timedwait
(
&
cond
,
mutex_
->
get_mutex
(),
&
timeout
);
}
}
else
else
...
...
src/ctx.hpp
View file @
ec98916e
...
@@ -86,7 +86,7 @@ namespace zmq
...
@@ -86,7 +86,7 @@ namespace zmq
// (except zmq_close).
// (except zmq_close).
// This function is non-blocking.
// This function is non-blocking.
// terminate must still be called afterwards.
// terminate must still be called afterwards.
// This function is optional, terminate will unblock any current
// This function is optional, terminate will unblock any current
// operations as well.
// operations as well.
int
shutdown
();
int
shutdown
();
...
@@ -98,7 +98,7 @@ namespace zmq
...
@@ -98,7 +98,7 @@ namespace zmq
zmq
::
socket_base_t
*
create_socket
(
int
type_
);
zmq
::
socket_base_t
*
create_socket
(
int
type_
);
void
destroy_socket
(
zmq
::
socket_base_t
*
socket_
);
void
destroy_socket
(
zmq
::
socket_base_t
*
socket_
);
// Start a new thread with proper scheduling parameters.
// Start a new thread with proper scheduling parameters.
void
start_thread
(
thread_t
&
thread_
,
thread_fn
*
tfn_
,
void
*
arg_
)
const
;
void
start_thread
(
thread_t
&
thread_
,
thread_fn
*
tfn_
,
void
*
arg_
)
const
;
// Send command to the destination thread.
// Send command to the destination thread.
...
@@ -203,7 +203,7 @@ namespace zmq
...
@@ -203,7 +203,7 @@ namespace zmq
// Is IPv6 enabled on this context?
// Is IPv6 enabled on this context?
bool
ipv6
;
bool
ipv6
;
// Thread scheduling parameters.
// Thread scheduling parameters.
int
thread_priority
;
int
thread_priority
;
int
thread_sched_policy
;
int
thread_sched_policy
;
...
...
src/gssapi_mechanism_base.cpp
View file @
ec98916e
...
@@ -178,7 +178,7 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
...
@@ -178,7 +178,7 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
const
uint8_t
flags
=
static_cast
<
char
*>
(
plaintext
.
value
)[
0
];
const
uint8_t
flags
=
static_cast
<
char
*>
(
plaintext
.
value
)[
0
];
if
(
flags
&
0x01
)
if
(
flags
&
0x01
)
msg_
->
set_flags
(
msg_t
::
more
);
msg_
->
set_flags
(
msg_t
::
more
);
if
(
flags
&
0x02
)
if
(
flags
&
0x02
)
msg_
->
set_flags
(
msg_t
::
command
);
msg_
->
set_flags
(
msg_t
::
command
);
...
...
src/gssapi_server.cpp
View file @
ec98916e
...
@@ -123,7 +123,7 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
...
@@ -123,7 +123,7 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
}
}
if
(
security_context_established
)
{
if
(
security_context_established
)
{
// Use ZAP protocol (RFC 27) to authenticate the user.
// Use ZAP protocol (RFC 27) to authenticate the user.
bool
expecting_zap_reply
=
false
;
bool
expecting_zap_reply
=
false
;
int
rc
=
session
->
zap_connect
();
int
rc
=
session
->
zap_connect
();
if
(
rc
==
0
)
{
if
(
rc
==
0
)
{
...
...
src/gssapi_server.hpp
View file @
ec98916e
...
@@ -85,8 +85,8 @@ namespace zmq
...
@@ -85,8 +85,8 @@ namespace zmq
void
accept_context
();
void
accept_context
();
int
produce_next_token
(
msg_t
*
msg_
);
int
produce_next_token
(
msg_t
*
msg_
);
int
process_next_token
(
msg_t
*
msg_
);
int
process_next_token
(
msg_t
*
msg_
);
void
send_zap_request
();
void
send_zap_request
();
int
receive_and_process_zap_reply
();
int
receive_and_process_zap_reply
();
};
};
}
}
...
...
src/mtrie.cpp
View file @
ec98916e
...
@@ -96,7 +96,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
...
@@ -96,7 +96,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count
=
1
;
count
=
1
;
next
.
node
=
NULL
;
next
.
node
=
NULL
;
}
}
else
else
if
(
count
==
1
)
{
if
(
count
==
1
)
{
unsigned
char
oldc
=
min
;
unsigned
char
oldc
=
min
;
mtrie_t
*
oldp
=
next
.
node
;
mtrie_t
*
oldp
=
next
.
node
;
...
@@ -109,7 +109,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
...
@@ -109,7 +109,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
min
=
std
::
min
(
min
,
c
);
min
=
std
::
min
(
min
,
c
);
next
.
table
[
oldc
-
min
]
=
oldp
;
next
.
table
[
oldc
-
min
]
=
oldp
;
}
}
else
else
if
(
min
<
c
)
{
if
(
min
<
c
)
{
// The new character is above the current character range.
// The new character is above the current character range.
unsigned
short
old_count
=
count
;
unsigned
short
old_count
=
count
;
...
@@ -252,7 +252,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
...
@@ -252,7 +252,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count
=
0
;
count
=
0
;
}
}
// Compact the node table if possible
// Compact the node table if possible
else
else
if
(
live_nodes
==
1
)
{
if
(
live_nodes
==
1
)
{
// If there's only one live node in the table we can
// If there's only one live node in the table we can
// switch to using the more compact single-node
// switch to using the more compact single-node
...
@@ -412,16 +412,16 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
...
@@ -412,16 +412,16 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
break
;
break
;
// If there's one subnode (optimisation).
// If there's one subnode (optimisation).
if
(
current
->
count
==
1
)
{
if
(
current
->
count
==
1
)
{
if
(
data_
[
0
]
!=
current
->
min
)
if
(
data_
[
0
]
!=
current
->
min
)
break
;
break
;
current
=
current
->
next
.
node
;
current
=
current
->
next
.
node
;
data_
++
;
data_
++
;
size_
--
;
size_
--
;
continue
;
continue
;
}
}
// If there are multiple subnodes.
// If there are multiple subnodes.
if
(
data_
[
0
]
<
current
->
min
||
data_
[
0
]
>=
if
(
data_
[
0
]
<
current
->
min
||
data_
[
0
]
>=
current
->
min
+
current
->
count
)
current
->
min
+
current
->
count
)
break
;
break
;
...
...
src/pgm_socket.cpp
View file @
ec98916e
...
@@ -82,7 +82,7 @@ int zmq::pgm_socket_t::init_address (const char *network_,
...
@@ -82,7 +82,7 @@ int zmq::pgm_socket_t::init_address (const char *network_,
}
}
*
port_number
=
atoi
(
port_delim
+
1
);
*
port_number
=
atoi
(
port_delim
+
1
);
char
network
[
256
];
char
network
[
256
];
if
(
port_delim
-
network_
>=
(
int
)
sizeof
(
network
)
-
1
)
{
if
(
port_delim
-
network_
>=
(
int
)
sizeof
(
network
)
-
1
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
...
@@ -195,24 +195,24 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -195,24 +195,24 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
}
}
{
{
const
int
rcvbuf
=
(
int
)
options
.
rcvbuf
;
const
int
rcvbuf
=
(
int
)
options
.
rcvbuf
;
if
(
rcvbuf
>=
0
)
{
if
(
rcvbuf
>=
0
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
rcvbuf
,
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
rcvbuf
,
sizeof
(
rcvbuf
)))
sizeof
(
rcvbuf
)))
goto
err_abort
;
goto
err_abort
;
}
}
const
int
sndbuf
=
(
int
)
options
.
sndbuf
;
const
int
sndbuf
=
(
int
)
options
.
sndbuf
;
if
(
sndbuf
>=
0
)
{
if
(
sndbuf
>=
0
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_SNDBUF
,
&
sndbuf
,
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_SNDBUF
,
&
sndbuf
,
sizeof
(
sndbuf
)))
sizeof
(
sndbuf
)))
goto
err_abort
;
goto
err_abort
;
}
}
const
int
max_tpdu
=
(
int
)
pgm_max_tpdu
;
const
int
max_tpdu
=
(
int
)
pgm_max_tpdu
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MTU
,
&
max_tpdu
,
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MTU
,
&
max_tpdu
,
sizeof
(
max_tpdu
)))
sizeof
(
max_tpdu
)))
goto
err_abort
;
goto
err_abort
;
}
}
if
(
receiver
)
{
if
(
receiver
)
{
...
@@ -334,28 +334,28 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -334,28 +334,28 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// Set IP level parameters.
// Set IP level parameters.
{
{
// Multicast loopback disabled by default
// Multicast loopback disabled by default
const
int
multicast_loop
=
0
;
const
int
multicast_loop
=
0
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_LOOP
,
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_LOOP
,
&
multicast_loop
,
sizeof
(
multicast_loop
)))
&
multicast_loop
,
sizeof
(
multicast_loop
)))
goto
err_abort
;
goto
err_abort
;
const
int
multicast_hops
=
options
.
multicast_hops
;
const
int
multicast_hops
=
options
.
multicast_hops
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_HOPS
,
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_HOPS
,
&
multicast_hops
,
sizeof
(
multicast_hops
)))
&
multicast_hops
,
sizeof
(
multicast_hops
)))
goto
err_abort
;
goto
err_abort
;
// Expedited Forwarding PHB for network elements, no ECN.
// Expedited Forwarding PHB for network elements, no ECN.
// Ignore return value due to varied runtime support.
// Ignore return value due to varied runtime support.
const
int
dscp
=
0x2e
<<
2
;
const
int
dscp
=
0x2e
<<
2
;
if
(
AF_INET6
!=
sa_family
)
if
(
AF_INET6
!=
sa_family
)
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TOS
,
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TOS
,
&
dscp
,
sizeof
(
dscp
));
&
dscp
,
sizeof
(
dscp
));
const
int
nonblocking
=
1
;
const
int
nonblocking
=
1
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NOBLOCK
,
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NOBLOCK
,
&
nonblocking
,
sizeof
(
nonblocking
)))
&
nonblocking
,
sizeof
(
nonblocking
)))
goto
err_abort
;
goto
err_abort
;
}
}
// Connect PGM transport to start state machine.
// Connect PGM transport to start state machine.
...
@@ -402,13 +402,13 @@ zmq::pgm_socket_t::~pgm_socket_t ()
...
@@ -402,13 +402,13 @@ zmq::pgm_socket_t::~pgm_socket_t ()
{
{
if
(
pgm_msgv
)
if
(
pgm_msgv
)
free
(
pgm_msgv
);
free
(
pgm_msgv
);
if
(
sock
)
if
(
sock
)
pgm_close
(
sock
,
TRUE
);
pgm_close
(
sock
,
TRUE
);
}
}
// Get receiver fds. receive_fd_ is signaled for incoming packets,
// Get receiver fds. receive_fd_ is signaled for incoming packets,
// waiting_pipe_fd_ is signaled for state driven events and data.
// waiting_pipe_fd_ is signaled for state driven events and data.
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
fd_t
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
fd_t
*
receive_fd_
,
fd_t
*
waiting_pipe_fd_
)
fd_t
*
waiting_pipe_fd_
)
{
{
socklen_t
socklen
;
socklen_t
socklen
;
...
@@ -430,12 +430,12 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
...
@@ -430,12 +430,12 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
zmq_assert
(
socklen
==
sizeof
(
*
waiting_pipe_fd_
));
zmq_assert
(
socklen
==
sizeof
(
*
waiting_pipe_fd_
));
}
}
// Get fds and store them into user allocated memory.
// Get fds and store them into user allocated memory.
// send_fd is for non-blocking send wire notifications.
// send_fd is for non-blocking send wire notifications.
// receive_fd_ is for incoming back-channel protocol packets.
// receive_fd_ is for incoming back-channel protocol packets.
// rdata_notify_fd_ is raised for waiting repair transmissions.
// rdata_notify_fd_ is raised for waiting repair transmissions.
// pending_notify_fd_ is for state driven events.
// pending_notify_fd_ is for state driven events.
void
zmq
::
pgm_socket_t
::
get_sender_fds
(
fd_t
*
send_fd_
,
fd_t
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_sender_fds
(
fd_t
*
send_fd_
,
fd_t
*
receive_fd_
,
fd_t
*
rdata_notify_fd_
,
fd_t
*
pending_notify_fd_
)
fd_t
*
rdata_notify_fd_
,
fd_t
*
pending_notify_fd_
)
{
{
socklen_t
socklen
;
socklen_t
socklen
;
...
@@ -475,7 +475,7 @@ void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
...
@@ -475,7 +475,7 @@ void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
{
{
size_t
nbytes
=
0
;
size_t
nbytes
=
0
;
const
int
status
=
pgm_send
(
sock
,
data_
,
data_len_
,
&
nbytes
);
const
int
status
=
pgm_send
(
sock
,
data_
,
data_len_
,
&
nbytes
);
// We have to write all data as one packet.
// We have to write all data as one packet.
...
@@ -551,7 +551,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -551,7 +551,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{
{
size_t
raw_data_len
=
0
;
size_t
raw_data_len
=
0
;
// We just sent all data from pgm_transport_recvmsgv up
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
// and have to return 0 that another engine in this thread is scheduled.
if
(
nbytes_rec
==
nbytes_processed
&&
nbytes_rec
>
0
)
{
if
(
nbytes_rec
==
nbytes_processed
&&
nbytes_rec
>
0
)
{
...
@@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert
(
nbytes_processed
==
0
);
zmq_assert
(
nbytes_processed
==
0
);
zmq_assert
(
nbytes_rec
==
0
);
zmq_assert
(
nbytes_rec
==
0
);
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport.
// from the transport.
pgm_error_t
*
pgm_error
=
NULL
;
pgm_error_t
*
pgm_error
=
NULL
;
...
@@ -590,7 +590,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -590,7 +590,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert
(
nbytes_rec
==
0
);
zmq_assert
(
nbytes_rec
==
0
);
// In case if no RDATA/ODATA caused POLLIN 0 is
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
// returned.
nbytes_rec
=
0
;
nbytes_rec
=
0
;
errno
=
EBUSY
;
errno
=
EBUSY
;
...
@@ -646,8 +646,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -646,8 +646,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Only one APDU per pgm_msgv_t structure is allowed.
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_len
==
1
);
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_len
==
1
);
struct
pgm_sk_buff_t
*
skb
=
struct
pgm_sk_buff_t
*
skb
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_skb
[
0
];
pgm_msgv
[
pgm_msgv_processed
].
msgv_skb
[
0
];
// Take pointers from pgm_msgv_t structure.
// Take pointers from pgm_msgv_t structure.
...
@@ -679,7 +679,7 @@ void zmq::pgm_socket_t::process_upstream ()
...
@@ -679,7 +679,7 @@ void zmq::pgm_socket_t::process_upstream ()
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
// No data should be returned.
// No data should be returned.
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
));
status
==
PGM_IO_STATUS_WOULD_BLOCK
));
...
@@ -698,7 +698,7 @@ int zmq::pgm_socket_t::compute_sqns (int tpdu_)
...
@@ -698,7 +698,7 @@ int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
{
// Convert rate into B/ms.
// Convert rate into B/ms.
uint64_t
rate
=
uint64_t
(
options
.
rate
)
/
8
;
uint64_t
rate
=
uint64_t
(
options
.
rate
)
/
8
;
// Compute the size of the buffer in bytes.
// Compute the size of the buffer in bytes.
uint64_t
size
=
uint64_t
(
options
.
recovery_ivl
)
*
rate
;
uint64_t
size
=
uint64_t
(
options
.
recovery_ivl
)
*
rate
;
...
...
src/pipe.cpp
View file @
ec98916e
...
@@ -515,13 +515,13 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
...
@@ -515,13 +515,13 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
if
(
inhwm_
<=
0
||
inhwmboost
<=
0
)
if
(
inhwm_
<=
0
||
inhwmboost
<=
0
)
in
=
0
;
in
=
0
;
if
(
outhwm_
<=
0
||
outhwmboost
<=
0
)
if
(
outhwm_
<=
0
||
outhwmboost
<=
0
)
out
=
0
;
out
=
0
;
lwm
=
compute_lwm
(
in
);
lwm
=
compute_lwm
(
in
);
hwm
=
out
;
hwm
=
out
;
}
}
void
zmq
::
pipe_t
::
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
)
void
zmq
::
pipe_t
::
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
)
...
...
src/socket_base.cpp
View file @
ec98916e
...
@@ -299,7 +299,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
...
@@ -299,7 +299,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
// First, register the pipe so that we can terminate it later on.
// First, register the pipe so that we can terminate it later on.
pipe_
->
set_event_sink
(
this
);
pipe_
->
set_event_sink
(
this
);
pipes
.
push_back
(
pipe_
);
pipes
.
push_back
(
pipe_
);
// Let the derived socket type know about new pipe.
// Let the derived socket type know about new pipe.
xattach_pipe
(
pipe_
,
subscribe_to_all_
);
xattach_pipe
(
pipe_
,
subscribe_to_all_
);
...
@@ -316,12 +316,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
...
@@ -316,12 +316,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
{
{
ENTER_MUTEX
();
ENTER_MUTEX
();
if
(
!
options
.
is_valid
(
option_
))
{
if
(
!
options
.
is_valid
(
option_
))
{
errno
=
EINVAL
;
errno
=
EINVAL
;
EXIT_MUTEX
();
EXIT_MUTEX
();
return
-
1
;
return
-
1
;
}
}
if
(
unlikely
(
ctx_terminated
))
{
if
(
unlikely
(
ctx_terminated
))
{
errno
=
ETERM
;
errno
=
ETERM
;
...
@@ -339,7 +338,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
...
@@ -339,7 +338,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
// If the socket type doesn't support the option, pass it to
// If the socket type doesn't support the option, pass it to
// the generic option parser.
// the generic option parser.
rc
=
options
.
setsockopt
(
option_
,
optval_
,
optvallen_
);
rc
=
options
.
setsockopt
(
option_
,
optval_
,
optvallen_
);
update_pipe_options
(
option_
);
update_pipe_options
(
option_
);
EXIT_MUTEX
();
EXIT_MUTEX
();
return
rc
;
return
rc
;
...
@@ -382,10 +381,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
...
@@ -382,10 +381,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
EXIT_MUTEX
();
EXIT_MUTEX
();
return
-
1
;
return
-
1
;
}
}
*
((
fd_t
*
)
optval_
)
=
((
mailbox_t
*
)
mailbox
)
->
get_fd
();
*
((
fd_t
*
)
optval_
)
=
((
mailbox_t
*
)
mailbox
)
->
get_fd
();
*
optvallen_
=
sizeof
(
fd_t
);
*
optvallen_
=
sizeof
(
fd_t
);
EXIT_MUTEX
();
EXIT_MUTEX
();
return
0
;
return
0
;
}
}
...
@@ -809,9 +808,9 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -809,9 +808,9 @@ int zmq::socket_base_t::connect (const char *addr_)
}
}
}
}
#endif
#endif
// TBD - Should we check address for ZMQ_HAVE_NORM???
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_OPENPGM
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
)
{
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
)
{
struct
pgm_addrinfo_t
*
res
=
NULL
;
struct
pgm_addrinfo_t
*
res
=
NULL
;
...
@@ -1027,7 +1026,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
...
@@ -1027,7 +1026,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if
(
unlikely
(
process_commands
(
timeout
,
false
)
!=
0
))
{
if
(
unlikely
(
process_commands
(
timeout
,
false
)
!=
0
))
{
EXIT_MUTEX
();
EXIT_MUTEX
();
return
-
1
;
return
-
1
;
}
}
rc
=
xsend
(
msg_
);
rc
=
xsend
(
msg_
);
if
(
rc
==
0
)
if
(
rc
==
0
)
break
;
break
;
...
@@ -1167,7 +1166,7 @@ int zmq::socket_base_t::close ()
...
@@ -1167,7 +1166,7 @@ int zmq::socket_base_t::close ()
{
{
// Mark the socket as dead
// Mark the socket as dead
tag
=
0xdeadbeef
;
tag
=
0xdeadbeef
;
// Transfer the ownership of the socket from this application thread
// Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown
// to the reaper thread which will take care of the rest of shutdown
// process.
// process.
...
@@ -1195,13 +1194,13 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
...
@@ -1195,13 +1194,13 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
if
(
!
thread_safe
)
if
(
!
thread_safe
)
fd
=
((
mailbox_t
*
)
mailbox
)
->
get_fd
();
fd
=
((
mailbox_t
*
)
mailbox
)
->
get_fd
();
else
{
else
{
ENTER_MUTEX
();
ENTER_MUTEX
();
reaper_signaler
=
new
signaler_t
();
reaper_signaler
=
new
signaler_t
();
// Add signaler to the safe mailbox
// Add signaler to the safe mailbox
fd
=
reaper_signaler
->
get_fd
();
fd
=
reaper_signaler
->
get_fd
();
((
mailbox_safe_t
*
)
mailbox
)
->
add_signaler
(
reaper_signaler
);
((
mailbox_safe_t
*
)
mailbox
)
->
add_signaler
(
reaper_signaler
);
// Send a signal to make sure reaper handle existing commands
// Send a signal to make sure reaper handle existing commands
...
@@ -1308,13 +1307,13 @@ void zmq::socket_base_t::process_term (int linger_)
...
@@ -1308,13 +1307,13 @@ void zmq::socket_base_t::process_term (int linger_)
void
zmq
::
socket_base_t
::
update_pipe_options
(
int
option_
)
void
zmq
::
socket_base_t
::
update_pipe_options
(
int
option_
)
{
{
if
(
option_
==
ZMQ_SNDHWM
||
option_
==
ZMQ_RCVHWM
)
if
(
option_
==
ZMQ_SNDHWM
||
option_
==
ZMQ_RCVHWM
)
{
{
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
{
{
pipes
[
i
]
->
set_hwms
(
options
.
rcvhwm
,
options
.
sndhwm
);
pipes
[
i
]
->
set_hwms
(
options
.
rcvhwm
,
options
.
sndhwm
);
}
}
}
}
}
}
...
@@ -1378,11 +1377,11 @@ void zmq::socket_base_t::in_event ()
...
@@ -1378,11 +1377,11 @@ void zmq::socket_base_t::in_event ()
// be destroyed.
// be destroyed.
ENTER_MUTEX
();
ENTER_MUTEX
();
// If the socket is thread safe we need to unsignal the reaper signaler
// If the socket is thread safe we need to unsignal the reaper signaler
if
(
thread_safe
)
if
(
thread_safe
)
reaper_signaler
->
recv
();
reaper_signaler
->
recv
();
process_commands
(
0
,
false
);
process_commands
(
0
,
false
);
EXIT_MUTEX
();
EXIT_MUTEX
();
check_destroy
();
check_destroy
();
...
...
src/thread.cpp
View file @
ec98916e
...
@@ -36,7 +36,7 @@
...
@@ -36,7 +36,7 @@
extern
"C"
extern
"C"
{
{
#if defined _WIN32_WCE
#if defined _WIN32_WCE
static
DWORD
thread_routine
(
LPVOID
arg_
)
static
DWORD
thread_routine
(
LPVOID
arg_
)
#else
#else
static
unsigned
int
__stdcall
thread_routine
(
void
*
arg_
)
static
unsigned
int
__stdcall
thread_routine
(
void
*
arg_
)
#endif
#endif
...
@@ -58,7 +58,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
...
@@ -58,7 +58,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
descriptor
=
(
HANDLE
)
_beginthreadex
(
NULL
,
0
,
descriptor
=
(
HANDLE
)
_beginthreadex
(
NULL
,
0
,
&::
thread_routine
,
this
,
0
,
NULL
);
&::
thread_routine
,
this
,
0
,
NULL
);
#endif
#endif
win_assert
(
descriptor
!=
NULL
);
win_assert
(
descriptor
!=
NULL
);
}
}
void
zmq
::
thread_t
::
stop
()
void
zmq
::
thread_t
::
stop
()
...
@@ -92,7 +92,7 @@ extern "C"
...
@@ -92,7 +92,7 @@ extern "C"
posix_assert
(
rc
);
posix_assert
(
rc
);
#endif
#endif
zmq
::
thread_t
*
self
=
(
zmq
::
thread_t
*
)
arg_
;
zmq
::
thread_t
*
self
=
(
zmq
::
thread_t
*
)
arg_
;
self
->
tfn
(
self
->
arg
);
self
->
tfn
(
self
->
arg
);
return
NULL
;
return
NULL
;
}
}
...
...
src/xpub.cpp
View file @
ec98916e
...
@@ -40,39 +40,39 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
...
@@ -40,39 +40,39 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
verbose_unsubs
(
false
),
verbose_unsubs
(
false
),
more
(
false
),
more
(
false
),
lossy
(
true
),
lossy
(
true
),
manual
(
false
),
manual
(
false
),
welcome_msg
()
welcome_msg
()
{
{
last_pipe
=
NULL
;
last_pipe
=
NULL
;
options
.
type
=
ZMQ_XPUB
;
options
.
type
=
ZMQ_XPUB
;
welcome_msg
.
init
();
welcome_msg
.
init
();
}
}
zmq
::
xpub_t
::~
xpub_t
()
zmq
::
xpub_t
::~
xpub_t
()
{
{
welcome_msg
.
close
();
welcome_msg
.
close
();
}
}
void
zmq
::
xpub_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
bool
subscribe_to_all_
)
void
zmq
::
xpub_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
bool
subscribe_to_all_
)
{
{
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
dist
.
attach
(
pipe_
);
dist
.
attach
(
pipe_
);
// If subscribe_to_all_ is specified, the caller would like to subscribe
// If subscribe_to_all_ is specified, the caller would like to subscribe
// to all data on this pipe, implicitly.
// to all data on this pipe, implicitly.
if
(
subscribe_to_all_
)
if
(
subscribe_to_all_
)
subscriptions
.
add
(
NULL
,
0
,
pipe_
);
subscriptions
.
add
(
NULL
,
0
,
pipe_
);
// if welcome message exist
// if welcome message exist
if
(
welcome_msg
.
size
()
>
0
)
if
(
welcome_msg
.
size
()
>
0
)
{
{
msg_t
copy
;
msg_t
copy
;
copy
.
init
();
copy
.
init
();
copy
.
copy
(
welcome_msg
);
copy
.
copy
(
welcome_msg
);
pipe_
->
write
(
&
copy
);
pipe_
->
write
(
&
copy
);
pipe_
->
flush
();
pipe_
->
flush
();
}
}
// The pipe is active when attached. Let's read the subscriptions from
// The pipe is active when attached. Let's read the subscriptions from
// it, if any.
// it, if any.
...
@@ -87,32 +87,32 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
...
@@ -87,32 +87,32 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// Apply the subscription to the trie
// Apply the subscription to the trie
unsigned
char
*
const
data
=
(
unsigned
char
*
)
sub
.
data
();
unsigned
char
*
const
data
=
(
unsigned
char
*
)
sub
.
data
();
const
size_t
size
=
sub
.
size
();
const
size_t
size
=
sub
.
size
();
if
(
size
>
0
&&
(
*
data
==
0
||
*
data
==
1
))
{
if
(
size
>
0
&&
(
*
data
==
0
||
*
data
==
1
))
{
if
(
manual
)
if
(
manual
)
{
{
last_pipe
=
pipe_
;
last_pipe
=
pipe_
;
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_metadata
.
push_back
(
sub
.
metadata
());
pending_metadata
.
push_back
(
sub
.
metadata
());
pending_flags
.
push_back
(
0
);
pending_flags
.
push_back
(
0
);
}
}
else
else
{
{
bool
unique
;
bool
unique
;
if
(
*
data
==
0
)
if
(
*
data
==
0
)
unique
=
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
unique
=
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
else
else
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
// If the (un)subscription is not a duplicate store it so that it can be
// If the (un)subscription is not a duplicate store it so that it can be
// passed to the user on next recv call unless verbose mode is enabled
// passed to the user on next recv call unless verbose mode is enabled
// which makes to pass always these messages.
// which makes to pass always these messages.
if
(
options
.
type
==
ZMQ_XPUB
&&
(
unique
||
(
*
data
==
1
&&
verbose_subs
)
||
if
(
options
.
type
==
ZMQ_XPUB
&&
(
unique
||
(
*
data
==
1
&&
verbose_subs
)
||
(
*
data
==
0
&&
verbose_unsubs
&&
verbose_subs
)))
{
(
*
data
==
0
&&
verbose_unsubs
&&
verbose_subs
)))
{
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_metadata
.
push_back
(
sub
.
metadata
());
pending_metadata
.
push_back
(
sub
.
metadata
());
pending_flags
.
push_back
(
0
);
pending_flags
.
push_back
(
0
);
}
}
}
}
}
}
else
{
else
{
// Process user message coming upstream from xsub socket
// Process user message coming upstream from xsub socket
...
@@ -131,46 +131,46 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
...
@@ -131,46 +131,46 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int
zmq
::
xpub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
int
zmq
::
xpub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
size_t
optvallen_
)
{
{
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
||
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
{
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
if
(
option_
==
ZMQ_XPUB_VERBOSE
)
if
(
option_
==
ZMQ_XPUB_VERBOSE
)
verbose_subs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
verbose_subs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
else
else
if
(
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
)
if
(
option_
==
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE
)
verbose_unsubs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
verbose_unsubs
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
else
else
if
(
option_
==
ZMQ_XPUB_NODROP
)
if
(
option_
==
ZMQ_XPUB_NODROP
)
lossy
=
(
*
static_cast
<
const
int
*>
(
optval_
)
==
0
);
lossy
=
(
*
static_cast
<
const
int
*>
(
optval_
)
==
0
);
else
else
if
(
option_
==
ZMQ_XPUB_MANUAL
)
if
(
option_
==
ZMQ_XPUB_MANUAL
)
manual
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
manual
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
}
}
else
else
if
(
option_
==
ZMQ_SUBSCRIBE
&&
manual
&&
last_pipe
!=
NULL
)
if
(
option_
==
ZMQ_SUBSCRIBE
&&
manual
&&
last_pipe
!=
NULL
)
subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
else
else
if
(
option_
==
ZMQ_UNSUBSCRIBE
&&
manual
&&
last_pipe
!=
NULL
)
if
(
option_
==
ZMQ_UNSUBSCRIBE
&&
manual
&&
last_pipe
!=
NULL
)
subscriptions
.
rm
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
subscriptions
.
rm
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
else
else
if
(
option_
==
ZMQ_XPUB_WELCOME_MSG
)
{
if
(
option_
==
ZMQ_XPUB_WELCOME_MSG
)
{
welcome_msg
.
close
();
welcome_msg
.
close
();
if
(
optvallen_
>
0
)
{
if
(
optvallen_
>
0
)
{
welcome_msg
.
init_size
(
optvallen_
);
welcome_msg
.
init_size
(
optvallen_
);
unsigned
char
*
data
=
(
unsigned
char
*
)
welcome_msg
.
data
();
unsigned
char
*
data
=
(
unsigned
char
*
)
welcome_msg
.
data
();
memcpy
(
data
,
optval_
,
optvallen_
);
memcpy
(
data
,
optval_
,
optvallen_
);
}
}
else
else
welcome_msg
.
init
();
welcome_msg
.
init
();
}
}
else
{
else
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
...
@@ -211,7 +211,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
...
@@ -211,7 +211,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
int
rc
=
-
1
;
// Assume we fail
int
rc
=
-
1
;
// Assume we fail
if
(
lossy
||
dist
.
check_hwm
())
{
if
(
lossy
||
dist
.
check_hwm
())
{
if
(
dist
.
send_to_matching
(
msg_
)
==
0
)
{
if
(
dist
.
send_to_matching
(
msg_
)
==
0
)
{
// If we are at the end of multi-part message we can mark
// If we are at the end of multi-part message we can mark
// all the pipes as non-matching.
// all the pipes as non-matching.
if
(
!
msg_more
)
if
(
!
msg_more
)
dist
.
unmatch
();
dist
.
unmatch
();
...
@@ -244,11 +244,11 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
...
@@ -244,11 +244,11 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
memcpy
(
msg_
->
data
(),
memcpy
(
msg_
->
data
(),
pending_data
.
front
().
data
(),
pending_data
.
front
().
data
(),
pending_data
.
front
().
size
());
pending_data
.
front
().
size
());
// set metadata only if there is some
// set metadata only if there is some
if
(
metadata_t
*
metadata
=
pending_metadata
.
front
())
{
if
(
metadata_t
*
metadata
=
pending_metadata
.
front
())
{
msg_
->
set_metadata
(
metadata
);
msg_
->
set_metadata
(
metadata
);
}
}
msg_
->
set_flags
(
pending_flags
.
front
());
msg_
->
set_flags
(
pending_flags
.
front
());
pending_data
.
pop_front
();
pending_data
.
pop_front
();
...
...
src/xpub.hpp
View file @
ec98916e
...
@@ -96,14 +96,14 @@ namespace zmq
...
@@ -96,14 +96,14 @@ namespace zmq
// Drop messages if HWM reached, otherwise return with EAGAIN
// Drop messages if HWM reached, otherwise return with EAGAIN
bool
lossy
;
bool
lossy
;
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
bool
manual
;
bool
manual
;
// Last pipe send subscription message, only used if xpub is on manual
// Last pipe send subscription message, only used if xpub is on manual
pipe_t
*
last_pipe
;
pipe_t
*
last_pipe
;
// Welcome message to send to pipe when attached
// Welcome message to send to pipe when attached
msg_t
welcome_msg
;
msg_t
welcome_msg
;
// List of pending (un)subscriptions, ie. those that were already
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
// applied to the trie, but not yet received by the user.
...
...
src/zmq.cpp
View file @
ec98916e
...
@@ -1065,11 +1065,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -1065,11 +1065,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno
=
ENOTSUP
;
errno
=
ENOTSUP
;
return
-
1
;
return
-
1
;
#endif
#endif
}
}
// Create pollfd
// Create pollfd
void
*
zmq_pollfd_new
()
void
*
zmq_pollfd_new
()
{
{
return
new
zmq
::
signaler_t
();
return
new
zmq
::
signaler_t
();
}
}
...
@@ -1080,7 +1080,7 @@ int zmq_pollfd_close (void* p_)
...
@@ -1080,7 +1080,7 @@ int zmq_pollfd_close (void* p_)
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
LIBZMQ_DELETE
(
s
);
LIBZMQ_DELETE
(
s
);
return
0
;
return
0
;
}
}
// Recv signal from pollfd
// Recv signal from pollfd
...
@@ -1088,7 +1088,7 @@ int zmq_pollfd_close (void* p_)
...
@@ -1088,7 +1088,7 @@ int zmq_pollfd_close (void* p_)
void
zmq_pollfd_recv
(
void
*
p_
)
void
zmq_pollfd_recv
(
void
*
p_
)
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
s
->
recv
();
s
->
recv
();
}
}
// Wait until pollfd is signalled
// Wait until pollfd is signalled
...
@@ -1096,7 +1096,7 @@ void zmq_pollfd_recv(void *p_)
...
@@ -1096,7 +1096,7 @@ void zmq_pollfd_recv(void *p_)
int
zmq_pollfd_wait
(
void
*
p_
,
int
timeout_
)
int
zmq_pollfd_wait
(
void
*
p_
,
int
timeout_
)
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
return
s
->
wait
(
timeout_
);
return
s
->
wait
(
timeout_
);
}
}
// Get pollfd fd
// Get pollfd fd
...
@@ -1108,7 +1108,7 @@ int zmq_pollfd_fd (void *p_)
...
@@ -1108,7 +1108,7 @@ int zmq_pollfd_fd (void *p_)
#endif
#endif
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
return
s
->
get_fd
();
return
s
->
get_fd
();
}
}
// Polling thread safe sockets version
// Polling thread safe sockets version
...
@@ -1153,27 +1153,27 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
...
@@ -1153,27 +1153,27 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int
thread_safe
;
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
{
&
thread_safe_size
)
==
-
1
)
{
return
-
1
;
return
-
1
;
}
}
// All thread safe sockets share same fd
// All thread safe sockets share same fd
if
(
thread_safe
)
{
if
(
thread_safe
)
{
// if poll fd is not set yet and events are set for this socket
// if poll fd is not set yet and events are set for this socket
if
(
!
use_pollfd
&&
items_
[
i
].
events
)
{
if
(
!
use_pollfd
&&
items_
[
i
].
events
)
{
use_pollfd
=
true
;
use_pollfd
=
true
;
pollfds_size
++
;
pollfds_size
++
;
}
}
}
}
else
else
pollfds_size
++
;
pollfds_size
++
;
}
}
else
else
pollfds_size
++
;
pollfds_size
++
;
}
}
if
(
pollfds_size
>
ZMQ_POLLITEMS_DFLT
)
{
if
(
pollfds_size
>
ZMQ_POLLITEMS_DFLT
)
{
pollfds
=
(
pollfd
*
)
malloc
(
pollfds_size
*
sizeof
(
pollfd
));
pollfds
=
(
pollfd
*
)
malloc
(
pollfds_size
*
sizeof
(
pollfd
));
alloc_assert
(
pollfds
);
alloc_assert
(
pollfds
);
...
@@ -1195,7 +1195,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
...
@@ -1195,7 +1195,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int
thread_safe
;
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
{
&
thread_safe_size
)
==
-
1
)
{
if
(
pollfds
!=
spollfds
)
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
free
(
pollfds
);
...
@@ -1212,7 +1212,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
...
@@ -1212,7 +1212,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
return
-
1
;
return
-
1
;
}
}
pollfds
[
pollfds_index
].
events
=
items_
[
i
].
events
?
POLLIN
:
0
;
pollfds
[
pollfds_index
].
events
=
items_
[
i
].
events
?
POLLIN
:
0
;
pollfds_index
++
;
pollfds_index
++
;
}
}
}
}
// Else, the poll item is a raw file descriptor. Just convert the
// Else, the poll item is a raw file descriptor. Just convert the
...
@@ -1374,16 +1374,16 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
...
@@ -1374,16 +1374,16 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int
thread_safe
;
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
&
thread_safe_size
)
==
-
1
)
return
-
1
;
return
-
1
;
if
(
thread_safe
&&
items_
[
i
].
events
)
{
if
(
thread_safe
&&
items_
[
i
].
events
)
{
use_pollfd
=
true
;
use_pollfd
=
true
;
FD_SET
(
zmq_pollfd_fd
(
p_
),
&
pollset_in
);
FD_SET
(
zmq_pollfd_fd
(
p_
),
&
pollset_in
);
break
;
break
;
}
}
}
}
}
}
zmq
::
fd_t
maxfd
=
0
;
zmq
::
fd_t
maxfd
=
0
;
...
@@ -1397,17 +1397,17 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
...
@@ -1397,17 +1397,17 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
int
thread_safe
;
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
&
thread_safe_size
)
==
-
1
)
return
-
1
;
return
-
1
;
if
(
!
thread_safe
)
{
if
(
!
thread_safe
)
{
zmq
::
fd_t
notify_fd
;
zmq
::
fd_t
notify_fd
;
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
notify_fd
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
notify_fd
,
&
zmq_fd_size
)
==
-
1
)
&
zmq_fd_size
)
==
-
1
)
return
-
1
;
return
-
1
;
if
(
items_
[
i
].
events
)
{
if
(
items_
[
i
].
events
)
{
FD_SET
(
notify_fd
,
&
pollset_in
);
FD_SET
(
notify_fd
,
&
pollset_in
);
if
(
maxfd
<
notify_fd
)
if
(
maxfd
<
notify_fd
)
...
...
tests/test_connect_rid.cpp
View file @
ec98916e
...
@@ -55,7 +55,7 @@ void test_stream_2_stream(){
...
@@ -55,7 +55,7 @@ void test_stream_2_stream(){
assert
(
rconn1
);
assert
(
rconn1
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
0
==
ret
);
assert
(
0
==
ret
);
// Do the connection.
// Do the connection.
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
...
@@ -67,7 +67,7 @@ void test_stream_2_stream(){
...
@@ -67,7 +67,7 @@ void test_stream_2_stream(){
assert (0 == ret);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
assert (0 == ret);
*/
*/
// Send data to the bound stream.
// Send data to the bound stream.
ret
=
zmq_send
(
rconn1
,
"conn1"
,
6
,
ZMQ_SNDMORE
);
ret
=
zmq_send
(
rconn1
,
"conn1"
,
6
,
ZMQ_SNDMORE
);
assert
(
6
==
ret
);
assert
(
6
==
ret
);
...
@@ -112,7 +112,7 @@ void test_router_2_router(bool named){
...
@@ -112,7 +112,7 @@ void test_router_2_router(bool named){
// Create connection socket.
// Create connection socket.
rconn1
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
rconn1
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
rconn1
);
assert
(
rconn1
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
0
==
ret
);
assert
(
0
==
ret
);
...
@@ -122,12 +122,12 @@ void test_router_2_router(bool named){
...
@@ -122,12 +122,12 @@ void test_router_2_router(bool named){
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_IDENTITY
,
"Y"
,
1
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_IDENTITY
,
"Y"
,
1
);
}
}
// Make call to connect using a connect_rid.
// Make call to connect using a connect_rid.
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
ret
=
zmq_connect
(
rconn1
,
bindip
);
ret
=
zmq_connect
(
rconn1
,
bindip
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
/* Uncomment to test assert on duplicate rid
/* Uncomment to test assert on duplicate rid
// Test duplicate connect attempt.
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
assert (0 == ret);
...
@@ -142,9 +142,9 @@ void test_router_2_router(bool named){
...
@@ -142,9 +142,9 @@ void test_router_2_router(bool named){
// Receive the name.
// Receive the name.
ret
=
zmq_recv
(
rbind
,
buff
,
256
,
0
);
ret
=
zmq_recv
(
rbind
,
buff
,
256
,
0
);
if
(
named
)
if
(
named
)
assert
(
ret
&&
'Y'
==
buff
[
0
]);
assert
(
ret
&&
'Y'
==
buff
[
0
]);
else
else
assert
(
ret
&&
0
==
buff
[
0
]);
assert
(
ret
&&
0
==
buff
[
0
]);
// Receive the data.
// Receive the data.
...
@@ -162,7 +162,7 @@ void test_router_2_router(bool named){
...
@@ -162,7 +162,7 @@ void test_router_2_router(bool named){
}
}
ret
=
zmq_send_const
(
rbind
,
"ok"
,
3
,
0
);
ret
=
zmq_send_const
(
rbind
,
"ok"
,
3
,
0
);
assert
(
3
==
ret
);
assert
(
3
==
ret
);
// If bound socket identity naming a problem, we'll likely see something funky here.
// If bound socket identity naming a problem, we'll likely see something funky here.
ret
=
zmq_recv
(
rconn1
,
buff
,
256
,
0
);
ret
=
zmq_recv
(
rconn1
,
buff
,
256
,
0
);
assert
(
'c'
==
buff
[
0
]
&&
6
==
ret
);
assert
(
'c'
==
buff
[
0
]
&&
6
==
ret
);
...
@@ -183,7 +183,7 @@ int main (void)
...
@@ -183,7 +183,7 @@ int main (void)
{
{
setup_test_environment
();
setup_test_environment
();
test_stream_2_stream
();
test_stream_2_stream
();
test_router_2_router
(
false
);
test_router_2_router
(
false
);
test_router_2_router
(
true
);
test_router_2_router
(
true
);
...
...
tests/test_srcfd.cpp
View file @
ec98916e
...
@@ -44,7 +44,7 @@
...
@@ -44,7 +44,7 @@
int
main
(
void
)
int
main
(
void
)
{
{
int
rc
;
int
rc
;
setup_test_environment
();
setup_test_environment
();
// Create the infrastructure
// Create the infrastructure
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
...
@@ -70,15 +70,15 @@ int main (void)
...
@@ -70,15 +70,15 @@ int main (void)
zmq_recvmsg
(
rep
,
&
msg
,
0
);
zmq_recvmsg
(
rep
,
&
msg
,
0
);
assert
(
zmq_msg_size
(
&
msg
)
==
MSG_SIZE
);
assert
(
zmq_msg_size
(
&
msg
)
==
MSG_SIZE
);
// get the messages source file descriptor
// get the messages source file descriptor
int
srcFd
=
zmq_msg_get
(
&
msg
,
ZMQ_SRCFD
);
int
srcFd
=
zmq_msg_get
(
&
msg
,
ZMQ_SRCFD
);
assert
(
srcFd
>=
0
);
assert
(
srcFd
>=
0
);
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// get the remote endpoint
// get the remote endpoint
struct
sockaddr_storage
ss
;
struct
sockaddr_storage
ss
;
#ifdef ZMQ_HAVE_HPUX
#ifdef ZMQ_HAVE_HPUX
int
addrlen
=
sizeof
ss
;
int
addrlen
=
sizeof
ss
;
...
@@ -92,7 +92,7 @@ int main (void)
...
@@ -92,7 +92,7 @@ int main (void)
rc
=
getnameinfo
((
struct
sockaddr
*
)
&
ss
,
addrlen
,
host
,
sizeof
host
,
NULL
,
0
,
NI_NUMERICHOST
);
rc
=
getnameinfo
((
struct
sockaddr
*
)
&
ss
,
addrlen
,
host
,
sizeof
host
,
NULL
,
0
,
NI_NUMERICHOST
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// assert it is localhost which connected
// assert it is localhost which connected
assert
(
strcmp
(
host
,
"127.0.0.1"
)
==
0
);
assert
(
strcmp
(
host
,
"127.0.0.1"
)
==
0
);
rc
=
zmq_close
(
rep
);
rc
=
zmq_close
(
rep
);
...
@@ -100,14 +100,14 @@ int main (void)
...
@@ -100,14 +100,14 @@ int main (void)
rc
=
zmq_close
(
req
);
rc
=
zmq_close
(
req
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// sleep a bit for the socket to be freed
// sleep a bit for the socket to be freed
usleep
(
30000
);
usleep
(
30000
);
// getting name from closed socket will fail
// getting name from closed socket will fail
rc
=
getpeername
(
srcFd
,
(
struct
sockaddr
*
)
&
ss
,
&
addrlen
);
rc
=
getpeername
(
srcFd
,
(
struct
sockaddr
*
)
&
ss
,
&
addrlen
);
assert
(
rc
==
-
1
);
assert
(
rc
==
-
1
);
assert
(
errno
==
EBADF
);
assert
(
errno
==
EBADF
);
rc
=
zmq_ctx_term
(
ctx
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
tests/test_stream_disconnect.cpp
View file @
ec98916e
...
@@ -50,7 +50,7 @@ bool has_more (void* socket)
...
@@ -50,7 +50,7 @@ bool has_more (void* socket)
int
more
=
0
;
int
more
=
0
;
size_t
more_size
=
sizeof
(
more
);
size_t
more_size
=
sizeof
(
more
);
int
rc
=
zmq_getsockopt
(
socket
,
ZMQ_RCVMORE
,
&
more
,
&
more_size
);
int
rc
=
zmq_getsockopt
(
socket
,
ZMQ_RCVMORE
,
&
more
,
&
more_size
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
false
;
return
false
;
return
more
!=
0
;
return
more
!=
0
;
}
}
...
@@ -165,18 +165,18 @@ int main(int, char**)
...
@@ -165,18 +165,18 @@ int main(int, char**)
// Grab the 1st frame (peer identity).
// Grab the 1st frame (peer identity).
zmq_msg_t
peer_frame
;
zmq_msg_t
peer_frame
;
rc
=
zmq_msg_init
(
&
peer_frame
);
rc
=
zmq_msg_init
(
&
peer_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
peer_frame
,
sockets
[
SERVER
],
0
);
rc
=
zmq_msg_recv
(
&
peer_frame
,
sockets
[
SERVER
],
0
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
assert
(
zmq_msg_size
(
&
peer_frame
)
>
0
);
assert
(
zmq_msg_size
(
&
peer_frame
)
>
0
);
assert
(
has_more
(
sockets
[
SERVER
]));
assert
(
has_more
(
sockets
[
SERVER
]));
// Grab the 2nd frame (actual payload).
// Grab the 2nd frame (actual payload).
zmq_msg_t
data_frame
;
zmq_msg_t
data_frame
;
rc
=
zmq_msg_init
(
&
data_frame
);
rc
=
zmq_msg_init
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
data_frame
,
sockets
[
SERVER
],
0
);
rc
=
zmq_msg_recv
(
&
data_frame
,
sockets
[
SERVER
],
0
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
// Make sure payload matches what we expect.
// Make sure payload matches what we expect.
const
char
*
const
data
=
(
const
char
*
)
zmq_msg_data
(
&
data_frame
);
const
char
*
const
data
=
(
const
char
*
)
zmq_msg_data
(
&
data_frame
);
...
@@ -184,39 +184,39 @@ int main(int, char**)
...
@@ -184,39 +184,39 @@ int main(int, char**)
// 0-length frame is a disconnection notification. The server
// 0-length frame is a disconnection notification. The server
// should receive it as the last step in the dialogue.
// should receive it as the last step in the dialogue.
if
(
size
==
0
)
{
if
(
size
==
0
)
{
++
step
;
++
step
;
assert
(
step
==
steps
);
assert
(
step
==
steps
);
}
}
else
{
else
{
assert
((
size_t
)
size
==
strlen
(
dialog
[
step
].
text
));
assert
((
size_t
)
size
==
strlen
(
dialog
[
step
].
text
));
int
cmp
=
memcmp
(
dialog
[
step
].
text
,
data
,
size
);
int
cmp
=
memcmp
(
dialog
[
step
].
text
,
data
,
size
);
assert
(
cmp
==
0
);
assert
(
cmp
==
0
);
++
step
;
++
step
;
assert
(
step
<
steps
);
assert
(
step
<
steps
);
// Prepare the response.
// Prepare the response.
rc
=
zmq_msg_close
(
&
data_frame
);
rc
=
zmq_msg_close
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_init_size
(
&
data_frame
,
rc
=
zmq_msg_init_size
(
&
data_frame
,
strlen
(
dialog
[
step
].
text
));
strlen
(
dialog
[
step
].
text
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
memcpy
(
zmq_msg_data
(
&
data_frame
),
dialog
[
step
].
text
,
memcpy
(
zmq_msg_data
(
&
data_frame
),
dialog
[
step
].
text
,
zmq_msg_size
(
&
data_frame
));
zmq_msg_size
(
&
data_frame
));
// Send the response.
// Send the response.
rc
=
zmq_msg_send
(
&
peer_frame
,
sockets
[
SERVER
],
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
peer_frame
,
sockets
[
SERVER
],
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
rc
=
zmq_msg_send
(
&
data_frame
,
sockets
[
SERVER
],
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
data_frame
,
sockets
[
SERVER
],
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
}
}
// Release resources.
// Release resources.
rc
=
zmq_msg_close
(
&
peer_frame
);
rc
=
zmq_msg_close
(
&
peer_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_close
(
&
data_frame
);
rc
=
zmq_msg_close
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
// Check for data received by the client.
// Check for data received by the client.
...
@@ -226,24 +226,24 @@ int main(int, char**)
...
@@ -226,24 +226,24 @@ int main(int, char**)
// Grab the 1st frame (peer identity).
// Grab the 1st frame (peer identity).
zmq_msg_t
peer_frame
;
zmq_msg_t
peer_frame
;
rc
=
zmq_msg_init
(
&
peer_frame
);
rc
=
zmq_msg_init
(
&
peer_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
peer_frame
,
sockets
[
CLIENT
],
0
);
rc
=
zmq_msg_recv
(
&
peer_frame
,
sockets
[
CLIENT
],
0
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
assert
(
zmq_msg_size
(
&
peer_frame
)
>
0
);
assert
(
zmq_msg_size
(
&
peer_frame
)
>
0
);
assert
(
has_more
(
sockets
[
CLIENT
]));
assert
(
has_more
(
sockets
[
CLIENT
]));
// Grab the 2nd frame (actual payload).
// Grab the 2nd frame (actual payload).
zmq_msg_t
data_frame
;
zmq_msg_t
data_frame
;
rc
=
zmq_msg_init
(
&
data_frame
);
rc
=
zmq_msg_init
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
data_frame
,
sockets
[
CLIENT
],
0
);
rc
=
zmq_msg_recv
(
&
data_frame
,
sockets
[
CLIENT
],
0
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
assert
(
zmq_msg_size
(
&
data_frame
)
>
0
);
assert
(
zmq_msg_size
(
&
data_frame
)
>
0
);
// Make sure payload matches what we expect.
// Make sure payload matches what we expect.
const
char
*
const
data
=
(
const
char
*
)
zmq_msg_data
(
&
data_frame
);
const
char
*
const
data
=
(
const
char
*
)
zmq_msg_data
(
&
data_frame
);
const
int
size
=
zmq_msg_size
(
&
data_frame
);
const
int
size
=
zmq_msg_size
(
&
data_frame
);
assert
((
size_t
)
size
==
strlen
(
dialog
[
step
].
text
));
assert
((
size_t
)
size
==
strlen
(
dialog
[
step
].
text
));
int
cmp
=
memcmp
(
dialog
[
step
].
text
,
data
,
size
);
int
cmp
=
memcmp
(
dialog
[
step
].
text
,
data
,
size
);
assert
(
cmp
==
0
);
assert
(
cmp
==
0
);
...
@@ -252,22 +252,22 @@ int main(int, char**)
...
@@ -252,22 +252,22 @@ int main(int, char**)
// Prepare the response (next line in the dialog).
// Prepare the response (next line in the dialog).
assert
(
step
<
steps
);
assert
(
step
<
steps
);
rc
=
zmq_msg_close
(
&
data_frame
);
rc
=
zmq_msg_close
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_init_size
(
&
data_frame
,
strlen
(
dialog
[
step
].
text
));
rc
=
zmq_msg_init_size
(
&
data_frame
,
strlen
(
dialog
[
step
].
text
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
memcpy
(
zmq_msg_data
(
&
data_frame
),
dialog
[
step
].
text
,
zmq_msg_size
(
&
data_frame
));
memcpy
(
zmq_msg_data
(
&
data_frame
),
dialog
[
step
].
text
,
zmq_msg_size
(
&
data_frame
));
// Send the response.
// Send the response.
rc
=
zmq_msg_send
(
&
peer_frame
,
sockets
[
CLIENT
],
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
peer_frame
,
sockets
[
CLIENT
],
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
rc
=
zmq_msg_send
(
&
data_frame
,
sockets
[
CLIENT
],
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
data_frame
,
sockets
[
CLIENT
],
ZMQ_SNDMORE
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
// Release resources.
// Release resources.
rc
=
zmq_msg_close
(
&
peer_frame
);
rc
=
zmq_msg_close
(
&
peer_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_msg_close
(
&
data_frame
);
rc
=
zmq_msg_close
(
&
data_frame
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
}
}
assert
(
step
==
steps
);
assert
(
step
==
steps
);
...
...
tests/test_thread_safe.cpp
View file @
ec98916e
...
@@ -56,42 +56,42 @@ int main (void)
...
@@ -56,42 +56,42 @@ int main (void)
rc
=
zmq_connect
(
client2
,
"tcp://127.0.0.1:5560"
);
rc
=
zmq_connect
(
client2
,
"tcp://127.0.0.1:5560"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
void
*
t1
=
zmq_threadstart
(
worker1
,
client2
);
void
*
t1
=
zmq_threadstart
(
worker1
,
client2
);
void
*
t2
=
zmq_threadstart
(
worker2
,
client2
);
void
*
t2
=
zmq_threadstart
(
worker2
,
client2
);
char
data
[
1
];
char
data
[
1
];
data
[
0
]
=
0
;
data
[
0
]
=
0
;
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
char
a
,
b
;
char
a
,
b
;
rc
=
zmq_recv
(
client
,
&
a
,
1
,
0
);
rc
=
zmq_recv
(
client
,
&
a
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_recv
(
client
,
&
b
,
1
,
0
);
rc
=
zmq_recv
(
client
,
&
b
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
// make sure they came from different threads
// make sure they came from different threads
assert
((
a
==
1
&&
b
==
2
)
||
(
a
==
2
&&
b
==
1
));
assert
((
a
==
1
&&
b
==
2
)
||
(
a
==
2
&&
b
==
1
));
}
}
// make the thread exit
// make the thread exit
data
[
0
]
=
1
;
data
[
0
]
=
1
;
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
zmq_threadclose
(
t1
);
zmq_threadclose
(
t1
);
zmq_threadclose
(
t2
);
zmq_threadclose
(
t2
);
rc
=
zmq_close
(
client2
);
rc
=
zmq_close
(
client2
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -107,52 +107,52 @@ int main (void)
...
@@ -107,52 +107,52 @@ int main (void)
void
worker1
(
void
*
s
)
void
worker1
(
void
*
s
)
{
{
const
char
worker_id
=
1
;
const
char
worker_id
=
1
;
char
c
;
char
c
;
while
(
true
)
while
(
true
)
{
{
int
rc
=
zmq_recv
(
s
,
&
c
,
1
,
0
);
int
rc
=
zmq_recv
(
s
,
&
c
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
if
(
c
==
0
)
if
(
c
==
0
)
{
{
msleep
(
100
);
msleep
(
100
);
rc
=
zmq_send_const
(
s
,
&
worker_id
,
1
,
0
);
rc
=
zmq_send_const
(
s
,
&
worker_id
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
}
}
else
else
{
{
// we got exit request
// we got exit request
break
;
break
;
}
}
}
}
}
}
void
worker2
(
void
*
s
)
void
worker2
(
void
*
s
)
{
{
const
char
worker_id
=
2
;
const
char
worker_id
=
2
;
char
c
;
char
c
;
while
(
true
)
while
(
true
)
{
{
int
rc
=
zmq_recv
(
s
,
&
c
,
1
,
0
);
int
rc
=
zmq_recv
(
s
,
&
c
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
c
==
1
||
c
==
0
);
assert
(
c
==
1
||
c
==
0
);
if
(
c
==
0
)
if
(
c
==
0
)
{
{
msleep
(
100
);
msleep
(
100
);
rc
=
zmq_send_const
(
s
,
&
worker_id
,
1
,
0
);
rc
=
zmq_send_const
(
s
,
&
worker_id
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
}
}
else
else
{
{
// we got exit request
// we got exit request
break
;
break
;
}
}
}
}
}
}
...
...
tests/test_xpub_manual.cpp
View file @
ec98916e
...
@@ -43,42 +43,42 @@ int main (void)
...
@@ -43,42 +43,42 @@ int main (void)
// set pub socket options
// set pub socket options
int
manual
=
1
;
int
manual
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
);
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_MANUAL
,
&
manual
,
4
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Create a subscriber
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_XSUB
);
assert
(
sub
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Subscribe for A
// Subscribe for A
char
subscription
[
2
]
=
{
1
,
'A'
};
char
subscription
[
2
]
=
{
1
,
'A'
};
rc
=
zmq_send_const
(
sub
,
subscription
,
2
,
0
);
rc
=
zmq_send_const
(
sub
,
subscription
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
rc
==
2
);
char
buffer
[
2
];
char
buffer
[
2
];
// Receive subscriptions from subscriber
// Receive subscriptions from subscriber
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
rc
=
zmq_recv
(
pub
,
buffer
,
2
,
0
);
assert
(
rc
==
2
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'A'
);
assert
(
buffer
[
1
]
==
'A'
);
// Subscribe socket for B instead
// Subscribe socket for B instead
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"B"
,
1
);
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SUBSCRIBE
,
"B"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Sending A message and B Message
// Sending A message and B Message
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
rc
=
zmq_send_const
(
pub
,
"A"
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_send_const
(
pub
,
"B"
,
1
,
0
);
rc
=
zmq_send_const
(
pub
,
"B"
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
ZMQ_DONTWAIT
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
buffer
[
0
]
==
'B'
);
assert
(
buffer
[
0
]
==
'B'
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
rc
=
zmq_close
(
pub
);
...
...
tests/test_xpub_welcome_msg.cpp
View file @
ec98916e
...
@@ -41,20 +41,20 @@ int main (void)
...
@@ -41,20 +41,20 @@ int main (void)
int
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
int
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// set pub socket options
// set pub socket options
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_WELCOME_MSG
,
"W"
,
1
);
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_WELCOME_MSG
,
"W"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Create a subscriber
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
// Subscribe to the welcome message
// Subscribe to the welcome message
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"W"
,
1
);
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
"W"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
assert
(
sub
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
char
buffer
[
2
];
char
buffer
[
2
];
...
@@ -63,11 +63,11 @@ int main (void)
...
@@ -63,11 +63,11 @@ int main (void)
assert
(
rc
==
2
);
assert
(
rc
==
2
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
'W'
);
assert
(
buffer
[
1
]
==
'W'
);
// Receive the welcome message
// Receive the welcome message
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
0
);
rc
=
zmq_recv
(
sub
,
buffer
,
1
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
buffer
[
0
]
==
'W'
);
assert
(
buffer
[
0
]
==
'W'
);
// Clean up.
// Clean up.
rc
=
zmq_close
(
pub
);
rc
=
zmq_close
(
pub
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment