Commit 2f44faa7 authored by Mikko Koppanen's avatar Mikko Koppanen

Merge pull request #247 from pieterh/sendrecv

Added zmq_msg_send/recv functions
parents 2b646cbf fb4748f2
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3
......
...@@ -77,6 +77,10 @@ Initialise a message:: ...@@ -77,6 +77,10 @@ Initialise a message::
linkzmq:zmq_msg_init_size[3] linkzmq:zmq_msg_init_size[3]
linkzmq:zmq_msg_init_data[3] linkzmq:zmq_msg_init_data[3]
Sending and receiving a message::
linkzmq:zmq_msg_send[3]
linkzmq:zmq_msg_recv[3]
Release a message:: Release a message::
linkzmq:zmq_msg_close[3] linkzmq:zmq_msg_close[3]
......
zmq_msg_recv(3)
===============
NAME
----
zmq_msg_recv - receive a message part from a socket
SYNOPSIS
--------
*int zmq_msg_recv (zmq_msg_t '*msg', void '*socket', int 'flags');*
DESCRIPTION
-----------
The _zmq_msg_send()_ function is identical to linkzmq:zmq_sendmsg[3], which
shall be deprecated in future versions. _zmq_msg_send()_ is more consistent
with other message manipulation functions.
The _zmq_msg_recv()_ function shall receive a message part from the socket
referenced by the 'socket' argument and store it in the message referenced by
the 'msg' argument. Any content previously stored in 'msg' shall be properly
deallocated. If there are no message parts available on the specified 'socket'
the _zmq_msg_recv()_ function shall block until the request can be satisfied.
The 'flags' argument is a combination of the flags defined below:
*ZMQ_DONTWAIT*::
Specifies that the operation should be performed in non-blocking mode. If there
are no messages available on the specified 'socket', the _zmq_msg_recv()_
function shall fail with 'errno' set to EAGAIN.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages: peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
by available memory.
An application that processes multi-part messages must use the _ZMQ_RCVMORE_
linkzmq:zmq_getsockopt[3] option after calling _zmq_msg_recv()_ to determine if
there are further parts to receive.
RETURN VALUE
------------
The _zmq_msg_recv()_ function shall return number of bytes in the message
if successful. Otherwise it shall return `-1` and set 'errno' to one of the
values defined below.
ERRORS
------
*EAGAIN*::
Non-blocking mode was requested and no messages are available at the moment.
*ENOTSUP*::
The _zmq_msg_recv()_ operation is not supported by this socket type.
*EFSM*::
The _zmq_msg_recv()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state. This error may occur with
socket types that switch between several states, such as ZMQ_REP. See the
_messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
*EINTR*::
The operation was interrupted by delivery of a signal before a message was
available.
*EFAULT*::
The message passed to the function was invalid.
EXAMPLE
-------
.Receiving a message from a socket
----
/* Create an empty 0MQ message */
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
/* Block until a message is available to be received from socket */
rc = zmq_msg_recv (&msg, socket, 0);
assert (rc != -1);
/* Release message */
zmq_msg_close (&msg);
----
.Receiving a multi-part message
----
int64_t more;
size_t more_size = sizeof more;
do {
/* Create an empty 0MQ message to hold the message part */
zmq_msg_t part;
int rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from socket */
rc = zmq_msg_recv (&part, socket, 0);
assert (rc != -1);
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0);
zmq_msg_close (&part);
} while (more);
----
SEE ALSO
--------
linkzmq:zmq_recv[3]
linkzmq:zmq_send[3]
linkzmq:zmq_msg_send[3]
linkzmq:zmq_getsockopt[3]
linkzmq:zmq_socket[7]
linkzmq:zmq[7]
AUTHORS
-------
This man page was written by Martin Sustrik <sustrik@250bpm.com>, Martin
Lucina <martin@lucina.net>, and Pieter Hintjens <ph@imatix.com>.
zmq_msg_send(3)
===============
NAME
----
zmq_msg_send - send a message part on a socket
SYNOPSIS
--------
*int zmq_msg_send (zmq_msg_t '*msg', void '*socket', int 'flags');*
DESCRIPTION
-----------
The _zmq_msg_send()_ function is identical to linkzmq:zmq_sendmsg[3], which
shall be deprecated in future versions. _zmq_msg_send()_ is more consistent
with other message manipulation functions.
The _zmq_msg_send()_ function shall queue the message referenced by the 'msg'
argument to be sent to the socket referenced by the 'socket' argument. The
'flags' argument is a combination of the flags defined below:
*ZMQ_DONTWAIT*::
Specifies that the operation should be performed in non-blocking mode. If the
message cannot be queued on the 'socket', the _zmq_msg_send()_ function shall
fail with 'errno' set to EAGAIN.
*ZMQ_SNDMORE*::
Specifies that the message being sent is a multi-part message, and that further
message parts are to follow. Refer to the section regarding multi-part messages
below for a detailed description.
The _zmq_msg_t_ structure passed to _zmq_msg_send()_ is nullified during the
call. If you want to send the same message to multiple sockets you have to copy
it using (e.g. using _zmq_msg_copy()_).
NOTE: A successful invocation of _zmq_msg_send()_ does not indicate that the
message has been transmitted to the network, only that it has been queued on
the 'socket' and 0MQ has assumed responsibility for the message.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages: peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
by available memory.
An application that sends multi-part messages must use the _ZMQ_SNDMORE_ flag
when sending each message part except the final one.
RETURN VALUE
------------
The _zmq_msg_send()_ function shall return number of bytes in the message
if successful. Otherwise it shall return `-1` and set 'errno' to one of the
values defined below.
ERRORS
------
*EAGAIN*::
Non-blocking mode was requested and the message cannot be sent at the moment.
*ENOTSUP*::
The _zmq_msg_send()_ operation is not supported by this socket type.
*EFSM*::
The _zmq_msg_send()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state. This error may occur with
socket types that switch between several states, such as ZMQ_REP. See the
_messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
*EINTR*::
The operation was interrupted by delivery of a signal before the message was
sent.
*EFAULT*::
Invalid message.
*ECANTROUTE*::
Message cannot be routed to the destination specified as the peer is either
dead or disconnected. This error makes sense only with ZMQ_ROUTER socket.
EXAMPLE
-------
.Filling in a message and sending it to a socket
----
/* Create a new message, allocating 6 bytes for message content */
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, 6);
assert (rc == 0);
/* Fill in message content with 'AAAAAA' */
memset (zmq_msg_data (&msg), 'A', 6);
/* Send the message to the socket */
rc = zmq_msg_send (&msg, socket, 0);
assert (rc == 6);
----
.Sending a multi-part message
----
/* Send a multi-part message consisting of three parts to socket */
rc = zmq_msg_send (&part1, socket, ZMQ_SNDMORE);
rc = zmq_msg_send (&part2, socket, ZMQ_SNDMORE);
/* Final part; no more parts to follow */
rc = zmq_msg_send (&part3, socket, 0);
----
SEE ALSO
--------
linkzmq:zmq_recv[3]
linkzmq:zmq_send[3]
linkzmq:zmq_msg_recv[3]
linkzmq:zmq_socket[7]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Martin Sustrik <sustrik@250bpm.com> and
Martin Lucina <mato@kotelna.sk>, and Pieter Hintjens <ph@imatix.com>.
...@@ -161,6 +161,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); ...@@ -161,6 +161,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn, void *hint); size_t size, zmq_free_fn *ffn, void *hint);
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, zmq_socket_t s, int flags);
ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, zmq_socket_t s, int flags);
ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
...@@ -245,9 +247,11 @@ ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr); ...@@ -245,9 +247,11 @@ ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr);
ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr); ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr);
ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
/* Experimental */
ZMQ_EXPORT int zmq_sendv (zmq_socket_t s, struct iovec *iov, size_t count, int flags); ZMQ_EXPORT int zmq_sendv (zmq_socket_t s, struct iovec *iov, size_t count, int flags);
ZMQ_EXPORT int zmq_recvmmsg (zmq_socket_t s, struct iovec *iov, size_t *count, int flags); ZMQ_EXPORT int zmq_recvmmsg (zmq_socket_t s, struct iovec *iov, size_t *count, int flags);
......
...@@ -294,17 +294,10 @@ static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) ...@@ -294,17 +294,10 @@ static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
return sz; return sz;
} }
/* To be deprecated once zmq_msg_send() is stable */
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { return zmq_msg_send (msg_, s_, flags_);
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
if(s->thread_safe()) s->lock();
int result = inner_sendmsg (s, msg_, flags_);
if(s->thread_safe()) s->unlock();
return result;
} }
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
...@@ -387,17 +380,10 @@ static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) ...@@ -387,17 +380,10 @@ static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
return (int) zmq_msg_size (msg_); return (int) zmq_msg_size (msg_);
} }
/* To be deprecated once zmq_msg_recv() is stable */
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { return zmq_msg_recv (msg_, s_, flags_);
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
if(s->thread_safe()) s->lock();
int result = inner_recvmsg(s, msg_, flags_);
if(s->thread_safe()) s->unlock();
return result;
} }
...@@ -516,6 +502,32 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, ...@@ -516,6 +502,32 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_); return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
} }
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
if(s->thread_safe()) s->lock();
int result = inner_sendmsg (s, msg_, flags_);
if(s->thread_safe()) s->unlock();
return result;
}
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
if(s->thread_safe()) s->lock();
int result = inner_recvmsg(s, msg_, flags_);
if(s->thread_safe()) s->unlock();
return result;
}
int zmq_msg_close (zmq_msg_t *msg_) int zmq_msg_close (zmq_msg_t *msg_)
{ {
return ((zmq::msg_t*) msg_)->close (); return ((zmq::msg_t*) msg_)->close ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment