Commit fb57110b authored by Joe Eli McIlvain's avatar Joe Eli McIlvain

Merge pull request #1577 from hintjens/master

Various cleanups for CLIENT-SERVER sockets
parents d0ffb913 f3ee8c69
...@@ -10,9 +10,9 @@ if(APPLE) ...@@ -10,9 +10,9 @@ if(APPLE)
endif() endif()
if(WIN32) if(WIN32)
option(WITH_TWEETNACL "Build with tweetnacl" OFF) option(WITH_TWEETNACL "Build with tweetnacl" OFF)
else() else()
option(WITH_TWEETNACL "Build with tweetnacl" ON) option(WITH_TWEETNACL "Build with tweetnacl" ON)
endif() endif()
if(WITH_TWEETNACL) if(WITH_TWEETNACL)
...@@ -23,7 +23,7 @@ if(WITH_TWEETNACL) ...@@ -23,7 +23,7 @@ if(WITH_TWEETNACL)
) )
set(TWEETNACL_SOURCES set(TWEETNACL_SOURCES
tweetnacl/src/tweetnacl.c tweetnacl/src/tweetnacl.c
) )
if(WIN32) if(WIN32)
else() else()
...@@ -157,6 +157,7 @@ check_function_exists(gethrtime HAVE_GETHRTIME) ...@@ -157,6 +157,7 @@ check_function_exists(gethrtime HAVE_GETHRTIME)
set(CMAKE_REQUIRED_INCLUDES ) set(CMAKE_REQUIRED_INCLUDES )
add_definitions(-D_REENTRANT -D_THREAD_SAFE) add_definitions(-D_REENTRANT -D_THREAD_SAFE)
add_definitions(-D_USING_CMAKE)
option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD) option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD)
......
...@@ -6,6 +6,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ ...@@ -6,6 +6,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \ zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \ zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \
zmq_msg_send.3 zmq_msg_recv.3 \ zmq_msg_send.3 zmq_msg_recv.3 \
zmq_msg_routing_id.3 zmq_msg_set_routing_id.3 \
zmq_send.3 zmq_recv.3 zmq_send_const.3 \ zmq_send.3 zmq_recv.3 zmq_send_const.3 \
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
zmq_getsockopt.3 zmq_setsockopt.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \
......
...@@ -688,6 +688,17 @@ Default value:: 0 (leave to OS default) ...@@ -688,6 +688,17 @@ Default value:: 0 (leave to OS default)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_THREADSAFE: Retrieve socket thread safety
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREADSAFE' option shall retrieve a boolean value indicating whether
or not the socket is threadsafe. Currently only 'ZMQ_CLIENT' sockets are
threadsafe.
[horizontal]
Option value type:: boolean
Applicable socket types:: all
ZMQ_TOS: Retrieve the Type-of-Service socket override status ZMQ_TOS: Retrieve the Type-of-Service socket override status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the IP_TOS option for the socket. Retrieve the IP_TOS option for the socket.
......
zmq_msg_routing_id(3)
=====================
NAME
----
zmq_msg_routing_id - return routing ID for message, if any
SYNOPSIS
--------
*uint32_t zmq_msg_routing_id (zmq_msg_t '*message');*
DESCRIPTION
-----------
The _zmq_msg_routing_id()_ function returns the routing ID for the message,
if any. The routing ID is set on all messages received from a 'ZMQ_SERVER'
socket. To send a message to a 'ZMQ_SERVER' socket you must set the routing
ID of a connected 'ZMQ_CLIENT' peer. Routing IDs are transient.
RETURN VALUE
------------
The _zmq_msg_routing_id()_ function shall return zero if there is no routing
ID, otherwise it shall return an unsigned 32-bit integer greater than zero.
EXAMPLE
-------
.Receiving a client message and routing ID
----
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
assert (server);
int rc = zmq_bind (server, "tcp://127.0.0.1:8080");
assert (rc == 0);
zmq_msg_t message;
rc = zmq_msg_init (&message);
assert (rc == 0);
// Receive a message from socket
rc = zmq_msg_recv (server, &message, 0);
assert (rc != -1);
uint32_t routing_id = zmq_msg_routing_id (&message);
assert (routing_id);
----
SEE ALSO
--------
linkzmq:zmq_msg_set_routing_id[3]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
zmq_msg_set_routing_id(3)
=========================
NAME
----
zmq_msg_set_routing_id - set routing ID property on message
SYNOPSIS
--------
*int zmq_msg_set_routing_id (zmq_msg_t '*message', uint32_t 'routing_id');*
DESCRIPTION
-----------
The _zmq_msg_set_routing_id()_ function sets the 'routing_id' specified, on the
the message pointed to by the 'message' argument. The 'routing_id' must be
greater than zero. To get a valid routing ID, you must receive a message
from a 'ZMQ_SERVER' socket, and use the libzmq:zmq_msg_routing_id method.
Routing IDs are transient.
RETURN VALUE
------------
The _zmq_msg_set_routing_id()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The provided 'routing_id' is zero.
SEE ALSO
--------
linkzmq:zmq_msg_routing_id[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
...@@ -48,7 +48,7 @@ _zmq_bind()_, thus allowing many-to-many relationships. ...@@ -48,7 +48,7 @@ _zmq_bind()_, thus allowing many-to-many relationships.
.Thread safety .Thread safety
0MQ 'sockets' are _not_ thread safe. Applications MUST NOT use a socket 0MQ 'sockets' are _not_ thread safe. Applications MUST NOT use a socket
from multiple threads except after migrating a socket from one thread to from multiple threads except after migrating a socket from one thread to
another with a "full fence" memory barrier. another with a "full fence" memory barrier.
.Socket types .Socket types
...@@ -56,73 +56,45 @@ The following sections present the socket types defined by 0MQ, grouped by the ...@@ -56,73 +56,45 @@ The following sections present the socket types defined by 0MQ, grouped by the
general _messaging pattern_ which is built from related socket types. general _messaging pattern_ which is built from related socket types.
Request-reply pattern Client-server pattern
~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~
The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
ZMQ_REQ
^^^^^^^
A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and
receive replies from a _service_. This socket type allows only an alternating
sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each
request sent is round-robined among all _services_, and each reply received is
matched with the last issued request.
If no services are available, then any send operation on the socket shall
block until at least one _service_ becomes available. The REQ socket shall
not discard messages.
[horizontal]
.Summary of ZMQ_REQ characteristics
Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Send, Receive, Send, Receive, ...
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Last peer
Action in mute state:: Block
ZMQ_REP The client-server pattern is used to allow a single 'ZMQ_SERVER' _server_ talk
^^^^^^^ to one or more 'ZMQ_CLIENT' _clients_. The client always starts the conversation,
A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and after which either peer can send messages asynchronously, to the other.
send replies to a _client_. This socket type allows only an alternating
sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each
request received is fair-queued from among all _clients_, and each reply sent
is routed to the _client_ that issued the last request. If the original
requester does not exist any more the reply is silently discarded.
[horizontal] The client-server pattern is formally defined by http://rfc.zeromq.org/spec:41.
.Summary of ZMQ_REP characteristics
Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Receive, Send, Receive, Send, ...
Incoming routing strategy:: Fair-queued
Outgoing routing strategy:: Last peer
Note: this pattern deprecates the use of 'ZMQ_DEALER' and 'ZMQ_ROUTER' to build
client-server architectures.
ZMQ_DEALER ZMQ_CLIENT
^^^^^^^^^^ ^^^^^^^^^^
A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending A 'ZMQ_CLIENT' socket talks to a 'ZMQ_SERVER' socket. Either peer can connect,
request/reply sockets. Each message sent is round-robined among all connected though the usual and recommended model is to bind the 'ZMQ_SERVER' and connect
peers, and each message received is fair-queued from all connected peers. the 'ZMQ_CLIENT'.
When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the If the 'ZMQ_CLIENT' socket has established a connection, linkzmq:zmq_send[3]
high water mark for all peers, or if there are no peers at all, then any will accept messages, queue them, and send them as rapidly as the network
linkzmq:zmq_send[3] operations on the socket shall block until the mute allows. The outgoing buffer limit is defined by the high water mark for the
state ends or at least one peer becomes available for sending; messages are not socket. If the outgoing buffer is full, or if there is no connected peer,
discarded. linkzmq:zmq_send[3] will block, by default. The 'ZMQ_CLIENT' socket will not
drop messages.
When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent
must consist of an empty message part, the _delimiter_, followed by one or more When a 'ZMQ_CLIENT' socket is connected to multiple 'ZMQ_SERVER' sockets,
_body parts_. outgoing messages are distributed between connected peers on a round-robin
basis. Likewise, the 'ZMQ_CLIENT' socket receives messages fairly from each
connected peer. This usage is sensible only for stateless protocols.
'ZMQ_CLIENT' sockets are threadsafe and can be used from multiple threads
at the same time. Note that replies from a 'ZMQ_SERVER' socket will go to
the first client thread that calls libzmq:zmq_msg_recv. If you need to get
replies back to the originating thread, use one 'ZMQ_CLIENT' socket per
thread.
[horizontal] [horizontal]
.Summary of ZMQ_DEALER characteristics .Summary of ZMQ_CLIENT characteristics
Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER' Compatible peer sockets:: 'ZMQ_SERVER'
Direction:: Bidirectional Direction:: Bidirectional
Send/receive pattern:: Unrestricted Send/receive pattern:: Unrestricted
Outgoing routing strategy:: Round-robin Outgoing routing strategy:: Round-robin
...@@ -130,38 +102,30 @@ Incoming routing strategy:: Fair-queued ...@@ -130,38 +102,30 @@ Incoming routing strategy:: Fair-queued
Action in mute state:: Block Action in mute state:: Block
ZMQ_ROUTER ZMQ_SERVER
^^^^^^^^^^ ^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending A 'ZMQ_SERVER' socket talks to a set of 'ZMQ_CLIENT' sockets. A 'ZMQ_SERVER'
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall socket can only reply to an incoming message: the 'ZMQ_CLIENT' peer must
prepend a message part containing the _identity_ of the originating peer to the always initiate a conversation.
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the Each received message has a 'routing_id' that is a 32-bit unsigned integer.
high water mark for all peers, then any messages sent to the socket shall be dropped The application can fetch this with linkzmq:zmq_msg_routing_id[3]. To send
until the mute state ends. Likewise, any messages routed to a peer for which a message to a given 'ZMQ_CLIENT' peer the application must set the peer's
the individual high water mark has been reached shall also be dropped. 'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3].
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the If the 'routing_id' is not specified, or does not refer to a connected client
_identity_ of the originating peer each message received shall contain an empty peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for
_delimiter_ message part. Hence, the entire structure of each received message the client peer is full, the send call will fail with EAGAIN. The 'ZMQ_SERVER'
as seen by the application becomes: one or more _identity_ parts, _delimiter_ socket shall not drop messages, nor shall it block.
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
[horizontal] [horizontal]
.Summary of ZMQ_ROUTER characteristics .Summary of ZMQ_SERVER characteristics
Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER' Compatible peer sockets:: 'ZMQ_CLIENT'
Direction:: Bidirectional Direction:: Bidirectional
Send/receive pattern:: Unrestricted Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued Incoming routing strategy:: Fair-queued
Action in mute state:: Drop Action in mute state:: Return EAGAIN
Publish-subscribe pattern Publish-subscribe pattern
...@@ -328,26 +292,26 @@ Action in mute state:: Block ...@@ -328,26 +292,26 @@ Action in mute state:: Block
Native Pattern Native Pattern
~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
The native pattern is used for communicating with TCP peers and allows The native pattern is used for communicating with TCP peers and allows
asynchronous requests and replies in either direction. asynchronous requests and replies in either direction.
ZMQ_STREAM ZMQ_STREAM
^^^^^^^^^^ ^^^^^^^^^^
A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a
non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
act as client and/or server, sending and/or receiving TCP data asynchronously. act as client and/or server, sending and/or receiving TCP data asynchronously.
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
containing the _identity_ of the originating peer to the message before passing containing the _identity_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all it to the application. Messages received are fair-queued from among all
connected peers. connected peers.
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _identity_ of the peer the message shall be message and use it to determine the _identity_ of the peer the message shall be
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error. routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
To open a connection to a server, use the zmq_connect call, and then fetch the To open a connection to a server, use the zmq_connect call, and then fetch the
socket identity using the ZMQ_IDENTITY zmq_getsockopt call. socket identity using the ZMQ_IDENTITY zmq_getsockopt call.
To close a specific connection, send the identity frame followed by a To close a specific connection, send the identity frame followed by a
...@@ -373,6 +337,116 @@ Incoming routing strategy:: Fair-queued ...@@ -373,6 +337,116 @@ Incoming routing strategy:: Fair-queued
Action in mute state:: EAGAIN Action in mute state:: EAGAIN
Request-reply pattern
~~~~~~~~~~~~~~~~~~~~~
The request-reply pattern is used for sending requests from a ZMQ_REQ _client_
to one or more ZMQ_REP _services_, and receiving subsequent replies to each
request sent.
The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28.
Note: this pattern will be deprecated in favor of the client-server pattern.
ZMQ_REQ
^^^^^^^
A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and
receive replies from a _service_. This socket type allows only an alternating
sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each
request sent is round-robined among all _services_, and each reply received is
matched with the last issued request.
If no services are available, then any send operation on the socket shall
block until at least one _service_ becomes available. The REQ socket shall
not discard messages.
[horizontal]
.Summary of ZMQ_REQ characteristics
Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Send, Receive, Send, Receive, ...
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Last peer
Action in mute state:: Block
ZMQ_REP
^^^^^^^
A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and
send replies to a _client_. This socket type allows only an alternating
sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each
request received is fair-queued from among all _clients_, and each reply sent
is routed to the _client_ that issued the last request. If the original
requester does not exist any more the reply is silently discarded.
[horizontal]
.Summary of ZMQ_REP characteristics
Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Receive, Send, Receive, Send, ...
Incoming routing strategy:: Fair-queued
Outgoing routing strategy:: Last peer
ZMQ_DEALER
^^^^^^^^^^
A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending
request/reply sockets. Each message sent is round-robined among all connected
peers, and each message received is fair-queued from all connected peers.
When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
linkzmq:zmq_send[3] operations on the socket shall block until the mute
state ends or at least one peer becomes available for sending; messages are not
discarded.
When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent
must consist of an empty message part, the _delimiter_, followed by one or more
_body parts_.
[horizontal]
.Summary of ZMQ_DEALER characteristics
Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: Round-robin
Incoming routing strategy:: Fair-queued
Action in mute state:: Block
ZMQ_ROUTER
^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
prepend a message part containing the _identity_ of the originating peer to the
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the
high water mark for all peers, then any messages sent to the socket shall be dropped
until the mute state ends. Likewise, any messages routed to a peer for which
the individual high water mark has been reached shall also be dropped.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty
_delimiter_ message part. Hence, the entire structure of each received message
as seen by the application becomes: one or more _identity_ parts, _delimiter_
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
[horizontal]
.Summary of ZMQ_ROUTER characteristics
Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Drop
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_socket()_ function shall return an opaque handle to the newly created The _zmq_socket()_ function shall return an opaque handle to the newly created
......
...@@ -227,8 +227,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg); ...@@ -227,8 +227,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property); ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
/******************************************************************************/ /******************************************************************************/
......
...@@ -44,9 +44,9 @@ zmq::client_t::~client_t () ...@@ -44,9 +44,9 @@ zmq::client_t::~client_t ()
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
lb.attach (pipe_); lb.attach (pipe_);
...@@ -54,28 +54,26 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -54,28 +54,26 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::client_t::xsend (msg_t *msg_) int zmq::client_t::xsend (msg_t *msg_)
{ {
zmq_assert(!(msg_->flags () & msg_t::more));
return lb.sendpipe (msg_, NULL); return lb.sendpipe (msg_, NULL);
} }
int zmq::client_t::xrecv (msg_t *msg_) int zmq::client_t::xrecv (msg_t *msg_)
{ {
int rc = fq.recvpipe (msg_, NULL); int rc = fq.recvpipe (msg_, NULL);
// Drop any messages with more flag // Drop any messages with more flag
while (rc == 0 && msg_->flags () & msg_t::more) { while (rc == 0 && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message // drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL); rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more) while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL); rc = fq.recvpipe (msg_, NULL);
// get the new message // get the new message
if (rc == 0) if (rc == 0)
rc = fq.recvpipe (msg_, NULL); rc = fq.recvpipe (msg_, NULL);
} }
return rc; return rc;
} }
......
...@@ -45,7 +45,7 @@ zmq::dealer_t::~dealer_t () ...@@ -45,7 +45,7 @@ zmq::dealer_t::~dealer_t ()
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -74,7 +74,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const ...@@ -74,7 +74,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{ {
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH", "DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"}; "XPUB", "XSUB", "STREAM",
"SERVER", "CLIENT"};
zmq_assert (socket_type >= 0 && socket_type <= 13); zmq_assert (socket_type >= 0 && socket_type <= 13);
return names [socket_type]; return names [socket_type];
} }
...@@ -190,7 +191,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const ...@@ -190,7 +191,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
case ZMQ_SERVER: case ZMQ_SERVER:
return type_ == "CLIENT"; return type_ == "CLIENT";
case ZMQ_CLIENT: case ZMQ_CLIENT:
return type_ == "CLIENT" || type_ == "SERVER"; return type_ == "SERVER";
default: default:
break; break;
} }
......
...@@ -497,18 +497,22 @@ bool zmq::msg_t::rm_refs (int refs_) ...@@ -497,18 +497,22 @@ bool zmq::msg_t::rm_refs (int refs_)
return true; return true;
} }
uint32_t zmq::msg_t::get_routing_id() uint32_t zmq::msg_t::get_routing_id ()
{ {
return u.base.routing_id; return u.base.routing_id;
} }
int zmq::msg_t::set_routing_id(uint32_t routing_id_) int zmq::msg_t::set_routing_id (uint32_t routing_id_)
{ {
u.base.routing_id = routing_id_; if (routing_id_) {
return 0; u.base.routing_id = routing_id_;
return 0;
}
errno = EINVAL;
return -1;
} }
zmq::atomic_counter_t* zmq::msg_t::refcnt() zmq::atomic_counter_t *zmq::msg_t::refcnt()
{ {
switch(u.base.type) switch(u.base.type)
{ {
......
...@@ -97,8 +97,8 @@ namespace zmq ...@@ -97,8 +97,8 @@ namespace zmq
bool is_vsm () const; bool is_vsm () const;
bool is_cmsg () const; bool is_cmsg () const;
bool is_zcmsg() const; bool is_zcmsg() const;
uint32_t get_routing_id(); uint32_t get_routing_id ();
int set_routing_id(uint32_t routing_id_); int set_routing_id (uint32_t routing_id_);
// After calling this function you can copy the message in POD-style // After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy. // refs_ times. No need to call copy.
......
...@@ -48,7 +48,7 @@ zmq::pair_t::~pair_t () ...@@ -48,7 +48,7 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_ != NULL); zmq_assert (pipe_ != NULL);
......
...@@ -85,8 +85,8 @@ namespace zmq ...@@ -85,8 +85,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_); void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an routing ID to be used by its clients. // Pipe endpoint can store an routing ID to be used by its clients.
void set_routing_id(uint32_t routing_id_); void set_routing_id (uint32_t routing_id_);
uint32_t get_routing_id(); uint32_t get_routing_id ();
// Pipe endpoint can store an opaque ID to be used by its clients. // Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_); void set_identity (const blob_t &identity_);
......
...@@ -45,7 +45,7 @@ zmq::pull_t::~pull_t () ...@@ -45,7 +45,7 @@ zmq::pull_t::~pull_t ()
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
......
...@@ -45,7 +45,7 @@ zmq::push_t::~push_t () ...@@ -45,7 +45,7 @@ zmq::push_t::~push_t ()
void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
// Don't delay pipe termination as there is no one // Don't delay pipe termination as there is no one
// to receive the delimiter. // to receive the delimiter.
......
...@@ -45,7 +45,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -45,7 +45,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_rid (generate_random ()), next_rid (generate_random ()),
mandatory (false), mandatory (false),
// raw_socket functionality in ROUTER is deprecated // raw_socket functionality in ROUTER is deprecated
raw_socket (false), raw_socket (false),
probe_router (false), probe_router (false),
handover (false) handover (false)
{ {
...@@ -67,7 +67,7 @@ zmq::router_t::~router_t () ...@@ -67,7 +67,7 @@ zmq::router_t::~router_t ()
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
...@@ -104,6 +104,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -104,6 +104,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_ROUTER_RAW: case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) { if (is_int && value >= 0) {
raw_socket = (value != 0); raw_socket = (value != 0);
...@@ -128,8 +129,8 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -128,8 +129,8 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_ROUTER_HANDOVER: case ZMQ_ROUTER_HANDOVER:
if (is_int && value >= 0) { if (is_int && value >= 0) {
handover = (value != 0); handover = (value != 0);
return 0; return 0;
...@@ -409,10 +410,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -409,10 +410,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
connect_rid.length()); connect_rid.length());
connect_rid.clear (); connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) if (it != outpipes.end ())
zmq_assert(false); // Not allowed to duplicate an existing rid zmq_assert(false); // Not allowed to duplicate an existing rid
} }
else else
if (options.raw_socket) { // Always assign identity for raw-socket if (options.raw_socket) { // Always assign identity for raw-socket
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
...@@ -420,7 +421,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -420,7 +421,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
} }
else else
if (!options.raw_socket) { if (!options.raw_socket) {
// Pick up handshake cases and also case where next identity is set // Pick up handshake cases and also case where next identity is set
msg.init (); msg.init ();
ok = pipe_->read (&msg); ok = pipe_->read (&msg);
...@@ -446,7 +447,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -446,7 +447,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
return false; return false;
else { else {
// We will allow the new connection to take over this // We will allow the new connection to take over this
// identity. Temporarily assign a new identity to the // identity. Temporarily assign a new identity to the
// existing pipe so we can terminate it asynchronously. // existing pipe so we can terminate it asynchronously.
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
...@@ -454,13 +455,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -454,13 +455,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
blob_t new_identity = blob_t (buf, sizeof buf); blob_t new_identity = blob_t (buf, sizeof buf);
it->second.pipe->set_identity (new_identity); it->second.pipe->set_identity (new_identity);
outpipe_t existing_outpipe = outpipe_t existing_outpipe =
{it->second.pipe, it->second.active}; {it->second.pipe, it->second.active};
ok = outpipes.insert (outpipes_t::value_type ( ok = outpipes.insert (outpipes_t::value_type (
new_identity, existing_outpipe)).second; new_identity, existing_outpipe)).second;
zmq_assert (ok); zmq_assert (ok);
// Remove the existing identity entry to allow the new // Remove the existing identity entry to allow the new
// connection to take the identity. // connection to take the identity.
outpipes.erase (it); outpipes.erase (it);
......
...@@ -39,28 +39,31 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -39,28 +39,31 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true), socket_base_t (parent_, tid_, sid_, true),
next_rid (generate_random ()) next_rid (generate_random ())
{ {
options.type = ZMQ_SERVER; options.type = ZMQ_SERVER;
} }
zmq::server_t::~server_t () zmq::server_t::~server_t ()
{ {
zmq_assert (outpipes.empty ()); zmq_assert (outpipes.empty ());
} }
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
uint32_t routing_id = next_rid++; uint32_t routing_id = next_rid++;
if (!routing_id)
routing_id = next_rid++; // Never use RID zero
pipe_->set_routing_id (routing_id); pipe_->set_routing_id (routing_id);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true}; outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second; bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
zmq_assert (ok); zmq_assert (ok);
fq.attach (pipe_); fq.attach (pipe_);
} }
void zmq::server_t::xpipe_terminated (pipe_t *pipe_) void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
...@@ -68,12 +71,12 @@ void zmq::server_t::xpipe_terminated (pipe_t *pipe_) ...@@ -68,12 +71,12 @@ void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ()); outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
zmq_assert (it != outpipes.end ()); zmq_assert (it != outpipes.end ());
outpipes.erase (it); outpipes.erase (it);
fq.pipe_terminated (pipe_); fq.pipe_terminated (pipe_);
} }
void zmq::server_t::xread_activated (pipe_t *pipe_) void zmq::server_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::server_t::xwrite_activated (pipe_t *pipe_) void zmq::server_t::xwrite_activated (pipe_t *pipe_)
...@@ -90,20 +93,18 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_) ...@@ -90,20 +93,18 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_)
int zmq::server_t::xsend (msg_t *msg_) int zmq::server_t::xsend (msg_t *msg_)
{ {
zmq_assert(!(msg_->flags () & msg_t::more)); // Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id ();
// Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id();
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ()) { if (it != outpipes.end ()) {
if (!it->second.pipe->check_write ()) { if (!it->second.pipe->check_write ()) {
it->second.active = false; it->second.active = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
} }
else { else {
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
return -1; return -1;
} }
...@@ -113,10 +114,11 @@ int zmq::server_t::xsend (msg_t *msg_) ...@@ -113,10 +114,11 @@ int zmq::server_t::xsend (msg_t *msg_)
// Message failed to send - we must close it ourselves. // Message failed to send - we must close it ourselves.
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} else {
it->second.pipe->flush ();
} }
else
it->second.pipe->flush ();
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -125,7 +127,7 @@ int zmq::server_t::xsend (msg_t *msg_) ...@@ -125,7 +127,7 @@ int zmq::server_t::xsend (msg_t *msg_)
} }
int zmq::server_t::xrecv (msg_t *msg_) int zmq::server_t::xrecv (msg_t *msg_)
{ {
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe); int rc = fq.recvpipe (msg_, &pipe);
...@@ -134,22 +136,22 @@ int zmq::server_t::xrecv (msg_t *msg_) ...@@ -134,22 +136,22 @@ int zmq::server_t::xrecv (msg_t *msg_)
// drop all frames of the current multi-frame message // drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL); rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more) while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL); rc = fq.recvpipe (msg_, NULL);
// get the new message // get the new message
if (rc == 0) if (rc == 0)
rc = fq.recvpipe (msg_, &pipe); rc = fq.recvpipe (msg_, &pipe);
} }
if (rc != 0) if (rc != 0)
return rc; return rc;
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_routing_id(); uint32_t routing_id = pipe->get_routing_id ();
msg_->set_routing_id(routing_id); msg_->set_routing_id (routing_id);
return 0; return 0;
} }
......
...@@ -95,13 +95,13 @@ void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) ...@@ -95,13 +95,13 @@ void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_) void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_)
{ {
// These options are used only under certain #ifdefs below. // These options are used only under certain #ifdefs below.
LIBZMQ_UNUSED(keepalive_); LIBZMQ_UNUSED (keepalive_);
LIBZMQ_UNUSED(keepalive_cnt_); LIBZMQ_UNUSED (keepalive_cnt_);
LIBZMQ_UNUSED(keepalive_idle_); LIBZMQ_UNUSED (keepalive_idle_);
LIBZMQ_UNUSED(keepalive_intvl_); LIBZMQ_UNUSED (keepalive_intvl_);
// If none of the #ifdefs apply, then s_ is unused. // If none of the #ifdefs apply, then s_ is unused.
LIBZMQ_UNUSED(s_); LIBZMQ_UNUSED (s_);
// Tuning TCP keep-alives if platform allows it // Tuning TCP keep-alives if platform allows it
// All values = -1 means skip and leave it for OS // All values = -1 means skip and leave it for OS
......
...@@ -57,7 +57,7 @@ ...@@ -57,7 +57,7 @@
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{ {
// TODO: Unused parameter, IPv6 support not implemented for Solaris. // TODO: Unused parameter, IPv6 support not implemented for Solaris.
LIBZMQ_UNUSED(ipv6_); LIBZMQ_UNUSED (ipv6_);
// Create a socket. // Create a socket.
const int fd = open_socket (AF_INET, SOCK_DGRAM, 0); const int fd = open_socket (AF_INET, SOCK_DGRAM, 0);
...@@ -124,7 +124,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_ ...@@ -124,7 +124,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{ {
// TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX. // TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX.
LIBZMQ_UNUSED(ipv6_); LIBZMQ_UNUSED (ipv6_);
// Create a socket. // Create a socket.
const int sd = open_socket (AF_INET, SOCK_DGRAM, 0); const int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
...@@ -211,8 +211,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_ ...@@ -211,8 +211,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
// This is true especially of Windows. // This is true especially of Windows.
int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_)
{ {
LIBZMQ_UNUSED(nic_); LIBZMQ_UNUSED (nic_);
LIBZMQ_UNUSED(ipv6_); LIBZMQ_UNUSED (ipv6_);
errno = ENODEV; errno = ENODEV;
return -1; return -1;
......
...@@ -56,7 +56,7 @@ zmq::xsub_t::~xsub_t () ...@@ -56,7 +56,7 @@ zmq::xsub_t::~xsub_t ()
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
......
...@@ -686,19 +686,19 @@ int zmq_msg_set (zmq_msg_t *, int, int) ...@@ -686,19 +686,19 @@ int zmq_msg_set (zmq_msg_t *, int, int)
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_) int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{ {
return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_); return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_);
} }
uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_) uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
{ {
return ((zmq::msg_t*) msg_)->get_routing_id(); return ((zmq::msg_t *) msg_)->get_routing_id ();
} }
// Get message metadata string // Get message metadata string
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
{ {
zmq::metadata_t *metadata = ((zmq::msg_t*) msg_)->metadata (); zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
const char *value = NULL; const char *value = NULL;
if (metadata) if (metadata)
value = metadata->get (std::string (property_)); value = metadata->get (std::string (property_));
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "testutil.hpp" #include "testutil.hpp"
int send_msg(zmq_msg_t* msg, void* s, int flags, int value); int send_msg (zmq_msg_t* msg, void* s, int flags, int value);
int main (void) int main (void)
{ {
...@@ -38,14 +38,14 @@ int main (void) ...@@ -38,14 +38,14 @@ int main (void)
assert (ctx); assert (ctx);
void *client = zmq_socket (ctx, ZMQ_CLIENT); void *client = zmq_socket (ctx, ZMQ_CLIENT);
void *dealer = zmq_socket (ctx, ZMQ_DEALER); void *server = zmq_socket (ctx, ZMQ_SERVER);
int rc; int rc;
rc = zmq_bind (client, "inproc://serverdropmore"); rc = zmq_bind (client, "inproc://serverdropmore");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (dealer, "inproc://serverdropmore"); rc = zmq_connect (server, "inproc://serverdropmore");
assert (rc == 0); assert (rc == 0);
zmq_msg_t msg; zmq_msg_t msg;
...@@ -53,34 +53,34 @@ int main (void) ...@@ -53,34 +53,34 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// we will send 2 3-frames messages and then single frame message, only last one should be received // we will send 2 3-frames messages and then single frame message, only last one should be received
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 1); rc = send_msg (&msg, client, ZMQ_SNDMORE, 1);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 2); rc = send_msg (&msg, client, ZMQ_SNDMORE, 2);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 3); rc = send_msg (&msg, client, 0, 3);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 4); rc = send_msg (&msg, client, ZMQ_SNDMORE, 4);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 5); rc = send_msg (&msg, client, ZMQ_SNDMORE, 5);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 6); rc = send_msg (&msg, client, 0, 6);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 7); rc = send_msg (&msg, client, 0, 7);
assert(rc == 1); assert(rc == 1);
rc = zmq_msg_recv (&msg, client, 0); rc = zmq_msg_recv (&msg, server, 0);
assert (rc == 1); assert (rc == 1);
assert(zmq_msg_more(&msg) == 0); assert (zmq_msg_more (&msg) == 0);
unsigned char* data = (unsigned char*)zmq_msg_data (&msg); unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
assert (data[0] == 7); assert (data [0] == 7);
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
...@@ -88,7 +88,7 @@ int main (void) ...@@ -88,7 +88,7 @@ int main (void)
rc = zmq_close (client); rc = zmq_close (client);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (dealer); rc = zmq_close (server);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
...@@ -97,20 +97,18 @@ int main (void) ...@@ -97,20 +97,18 @@ int main (void)
return 0 ; return 0 ;
} }
int send_msg(zmq_msg_t* msg, void* s, int flags, int value) int send_msg (zmq_msg_t *msg, void *s, int flags, int value)
{ {
int rc = zmq_msg_close(msg); int rc = zmq_msg_close (msg);
if (rc != 0) if (rc != 0)
return rc; return rc;
zmq_msg_init_size(msg, 1); zmq_msg_init_size (msg, 1);
if (rc != 0) if (rc != 0)
return rc; return rc;
unsigned char* data = (unsigned char*)zmq_msg_data(msg); unsigned char *data = (unsigned char *) zmq_msg_data (msg);
data[0] = (unsigned char)value; data [0] = (unsigned char) value;
return zmq_msg_send (msg, s, flags); return zmq_msg_send (msg, s, flags);
} }
...@@ -38,31 +38,29 @@ int main (void) ...@@ -38,31 +38,29 @@ int main (void)
void *server = zmq_socket (ctx, ZMQ_SERVER); void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client = zmq_socket (ctx, ZMQ_CLIENT); void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc; int rc = zmq_bind (server, "tcp://127.0.0.1:5560");
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:5560"); rc = zmq_connect (client, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init_size(&msg,1); rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0); assert (rc == 0);
char * data = (char *)zmq_msg_data(&msg); char *data = (char *) zmq_msg_data (&msg);
data[0] = 1; data [0] = 1;
rc = zmq_msg_send(&msg, client, 0); rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_msg_init(&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv(&msg, server, 0); rc = zmq_msg_recv (&msg, server, 0);
assert (rc == 1); assert (rc == 1);
uint32_t routing_id = zmq_msg_get_routing_id(&msg); uint32_t routing_id = zmq_msg_routing_id (&msg);
assert(routing_id != 0); assert (routing_id != 0);
rc = zmq_msg_close(&msg); rc = zmq_msg_close(&msg);
assert (rc == 0); assert (rc == 0);
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "testutil.hpp" #include "testutil.hpp"
int send_msg(zmq_msg_t* msg, void* s, int flags, int value); int send_msg (zmq_msg_t* msg, void* s, int flags, int value);
int main (void) int main (void)
{ {
...@@ -38,7 +38,7 @@ int main (void) ...@@ -38,7 +38,7 @@ int main (void)
assert (ctx); assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER); void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client = zmq_socket (ctx, ZMQ_DEALER); void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc; int rc;
...@@ -61,13 +61,13 @@ int main (void) ...@@ -61,13 +61,13 @@ int main (void)
rc = send_msg (&msg, client, 0, 3); rc = send_msg (&msg, client, 0, 3);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, client, ZMQ_SNDMORE, 4); rc = send_msg (&msg, client, ZMQ_SNDMORE, 4);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, client, ZMQ_SNDMORE, 5); rc = send_msg (&msg, client, ZMQ_SNDMORE, 5);
assert(rc == 1); assert(rc == 1);
rc = send_msg (&msg, client, 0, 6); rc = send_msg (&msg, client, 0, 6);
assert(rc == 1); assert(rc == 1);
...@@ -75,12 +75,12 @@ int main (void) ...@@ -75,12 +75,12 @@ int main (void)
assert(rc == 1); assert(rc == 1);
rc = zmq_msg_recv (&msg, server, 0); rc = zmq_msg_recv (&msg, server, 0);
assert (rc == 1); assert (rc == 1);
assert(zmq_msg_more(&msg) == 0); assert (zmq_msg_more (&msg) == 0);
unsigned char* data = (unsigned char*)zmq_msg_data (&msg); unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
assert (data[0] == 7); assert (data [0] == 7);
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
...@@ -97,20 +97,18 @@ int main (void) ...@@ -97,20 +97,18 @@ int main (void)
return 0 ; return 0 ;
} }
int send_msg(zmq_msg_t* msg, void* s, int flags, int value) int send_msg (zmq_msg_t *msg, void *s, int flags, int value)
{ {
int rc = zmq_msg_close(msg); int rc = zmq_msg_close (msg);
if (rc != 0) if (rc != 0)
return rc; return rc;
zmq_msg_init_size(msg, 1); zmq_msg_init_size (msg, 1);
if (rc != 0) if (rc != 0)
return rc; return rc;
unsigned char* data = (unsigned char*)zmq_msg_data(msg); unsigned char *data = (unsigned char *) zmq_msg_data (msg);
data[0] = (unsigned char)value; data [0] = (unsigned char) value;
return zmq_msg_send (msg, s, flags); return zmq_msg_send (msg, s, flags);
} }
...@@ -29,71 +29,51 @@ ...@@ -29,71 +29,51 @@
#include "testutil.hpp" #include "testutil.hpp"
void worker1(void* s); // Client threads loop on send/recv until told to exit
void worker2(void* s); void client_thread (void *client)
{
char data = 0;
for (int count = 0; count < 100000; count++) {
int rc = zmq_send (client, &data, 1, 0);
assert (rc == 1);
}
data = 1;
int rc = zmq_send (client, &data, 1, 0);
assert (rc == 1);
}
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment ();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *client = zmq_socket (ctx, ZMQ_CLIENT); void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client2 = zmq_socket (ctx, ZMQ_CLIENT); int rc = zmq_bind (server, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
int thread_safe; int thread_safe;
size_t size = sizeof(int); size_t size = sizeof (int);
zmq_getsockopt (client, ZMQ_THREAD_SAFE, &thread_safe, &size); zmq_getsockopt (client, ZMQ_THREAD_SAFE, &thread_safe, &size);
assert (thread_safe == 1); assert (thread_safe == 1);
rc = zmq_connect (client, "tcp://127.0.0.1:5560");
int rc;
rc = zmq_bind (client, "tcp://127.0.0.1:5560");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (client2, "tcp://127.0.0.1:5560"); void *t1 = zmq_threadstart (client_thread, client);
assert (rc == 0); void *t2 = zmq_threadstart (client_thread, client);
void* t1 = zmq_threadstart(worker1, client2);
void* t2 = zmq_threadstart(worker2, client2);
char data[1];
data[0] = 0;
for (int i=0; i < 10; i++) {
rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1);
char a, b;
rc = zmq_recv(client, &a, 1, 0);
assert(rc == 1);
rc = zmq_recv(client, &b, 1, 0); char data;
assert(rc == 1); int threads_completed = 0;
while (threads_completed < 2) {
// make sure they came from different threads zmq_recv (server, &data, 1, 0);
assert((a == 1 && b == 2) || (a == 2 && b == 1)); if (data == 1)
threads_completed++; // Thread ended
} }
zmq_threadclose (t1);
zmq_threadclose (t2);
// make the thread exit rc = zmq_close (server);
data[0] = 1;
rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1);
zmq_threadclose(t1);
zmq_threadclose(t2);
rc = zmq_close (client2);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (client); rc = zmq_close (client);
...@@ -104,59 +84,3 @@ int main (void) ...@@ -104,59 +84,3 @@ int main (void)
return 0 ; return 0 ;
} }
void worker1(void* s)
{
const char worker_id = 1;
char c;
while (true)
{
int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1);
if (c == 0)
{
msleep(100);
rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1);
}
else
{
// we got exit request
break;
}
}
}
void worker2(void* s)
{
const char worker_id = 2;
char c;
while (true)
{
int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1);
assert(c == 1 || c == 0);
if (c == 0)
{
msleep(100);
rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1);
}
else
{
// we got exit request
break;
}
}
}
...@@ -32,7 +32,11 @@ ...@@ -32,7 +32,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../src/stdint.hpp" #include "../src/stdint.hpp"
#include "platform.hpp" #ifdef USING_CMAKE
# include "platform.hpp"
#else
# include "../src/platform.hpp"
#endif
// This defines the settle time used in tests; raise this if we // This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not // get test failures on slower systems due to binds/connects not
...@@ -60,7 +64,6 @@ ...@@ -60,7 +64,6 @@
// Bounce a message from client to server and back // Bounce a message from client to server and back
// For REQ/REP or DEALER/DEALER pairs only // For REQ/REP or DEALER/DEALER pairs only
void void
bounce (void *server, void *client) bounce (void *server, void *client)
{ {
...@@ -116,7 +119,6 @@ bounce (void *server, void *client) ...@@ -116,7 +119,6 @@ bounce (void *server, void *client)
// Same as bounce, but expect messages to never arrive // Same as bounce, but expect messages to never arrive
// for security or subscriber reasons. // for security or subscriber reasons.
void void
expect_bounce_fail (void *server, void *client) expect_bounce_fail (void *server, void *client)
{ {
...@@ -193,7 +195,9 @@ const char *SEQ_END = (const char *) 1; ...@@ -193,7 +195,9 @@ const char *SEQ_END = (const char *) 1;
// Sends a message composed of frames that are C strings or null frames. // Sends a message composed of frames that are C strings or null frames.
// The list must be terminated by SEQ_END. // The list must be terminated by SEQ_END.
// Example: s_send_seq (req, "ABC", 0, "DEF", SEQ_END); // Example: s_send_seq (req, "ABC", 0, "DEF", SEQ_END);
void s_send_seq (void *socket, ...)
void
s_send_seq (void *socket, ...)
{ {
va_list ap; va_list ap;
va_start (ap, socket); va_start (ap, socket);
...@@ -222,7 +226,9 @@ void s_send_seq (void *socket, ...) ...@@ -222,7 +226,9 @@ void s_send_seq (void *socket, ...)
// the given data which can be either C strings or 0 for a null frame. // the given data which can be either C strings or 0 for a null frame.
// The list must be terminated by SEQ_END. // The list must be terminated by SEQ_END.
// Example: s_recv_seq (rep, "ABC", 0, "DEF", SEQ_END); // Example: s_recv_seq (rep, "ABC", 0, "DEF", SEQ_END);
void s_recv_seq (void *socket, ...)
void
s_recv_seq (void *socket, ...)
{ {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
...@@ -260,7 +266,8 @@ void s_recv_seq (void *socket, ...) ...@@ -260,7 +266,8 @@ void s_recv_seq (void *socket, ...)
// Sets a zero linger period on a socket and closes it. // Sets a zero linger period on a socket and closes it.
void close_zero_linger (void *socket) void
close_zero_linger (void *socket)
{ {
int linger = 0; int linger = 0;
int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof(linger)); int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof(linger));
...@@ -269,7 +276,8 @@ void close_zero_linger (void *socket) ...@@ -269,7 +276,8 @@ void close_zero_linger (void *socket)
assert (rc == 0); assert (rc == 0);
} }
void setup_test_environment() void
setup_test_environment (void)
{ {
#if defined _WIN32 #if defined _WIN32
# if defined _MSC_VER # if defined _MSC_VER
...@@ -296,8 +304,11 @@ void setup_test_environment() ...@@ -296,8 +304,11 @@ void setup_test_environment()
} }
// Provide portable millisecond sleep // Provide portable millisecond sleep
// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for // http://www.cplusplus.com/forum/unices/60161/
void msleep (int milliseconds) // http://en.cppreference.com/w/cpp/thread/sleep_for
void
msleep (int milliseconds)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
Sleep (milliseconds); Sleep (milliseconds);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment