Commit 93529d8c authored by Chuck Remes's avatar Chuck Remes Committed by Martin Sustrik

Add zmq_getmsgopt to the API

The new function allows to retrieve options (flags)
from zmq_msg_t.
Signed-off-by: 's avatarChuck Remes <cremes@mac.com>
Renamed from zmq_msg_flags to zmq_getmsgopt
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent bb66f3cc
...@@ -32,6 +32,7 @@ tests/test_reqrep_device ...@@ -32,6 +32,7 @@ tests/test_reqrep_device
tests/test_reqrep_drop tests/test_reqrep_drop
tests/test_sub_forward tests/test_sub_forward
tests/test_invalid_rep tests/test_invalid_rep
tests/test_msg_flags
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat
......
...@@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ ...@@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_sendmsg.3 zmq_recvmsg.3 zmq_getmsgopt.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
MAN_DOC = $(MAN1) $(MAN3) $(MAN7) MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
......
zmq_getmsgopt(3)
================
NAME
----
zmq_getmsgopt - retrieve message option
SYNOPSIS
--------
*int zmq_getmsgopt (zmq_msg_t '*message', int 'option_name', void '*option_value', size_t '*option_len');*
DESCRIPTION
-----------
The _zmq_getmsgopt()_ function shall retrieve the value for the option
specified by the 'option_name' argument for the message pointed to by the
'message' argument, and store it in the buffer pointed to by the 'option_value'
argument. The 'option_len' argument is the size in bytes of the buffer pointed
to by 'option_value'; upon successful completion _zmq_getsockopt()_ shall
modify the 'option_len' argument to indicate the actual size of the option
value stored in the buffer.
The following options can be retrieved with the _zmq_getmsgopt()_ function:
*ZMQ_MORE*::
Indicates that there are more message parts to follow after the 'message'.
RETURN VALUE
------------
The _zmq_getmsgopt()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The requested option _option_name_ is unknown, or the requested _option_size_ or
_option_value_ is invalid, or the size of the buffer pointed to by
_option_value_, as specified by _option_len_, is insufficient for storing the
option value.
EXAMPLE
-------
.Receiving a multi-part message
----
zmq_msg_t part;
int more;
size_t more_size = sizeof (more);
while (true) {
/* Create an empty 0MQ message to hold the message part */
int rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from socket */
rc = zmq_recvmsg (socket, &part, 0);
assert (rc != -1);
rc = getmsgopt (&part, ZMQ_MORE, &more, &more_size);
assert (rc == 0);
if (more) {
fprintf (stderr, "more\n");
}
else {
fprintf (stderr, "end\n");
break;
}
zmq_msg_close (part);
}
----
SEE ALSO
--------
linkzmq:zmq_msg_data[3]
linkzmq:zmq_msg_init[3]
linkzmq:zmq_msg_init_size[3]
linkzmq:zmq_msg_init_data[3]
linkzmq:zmq_msg_close[3]
linkzmq:zmq[7]
AUTHORS
-------
The 0MQ documentation was written by Chuck Remes <cremes@mac.com>.
...@@ -139,6 +139,8 @@ ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ...@@ -139,6 +139,8 @@ ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg); ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg);
ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg); ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval,
size_t *optvallen);
/******************************************************************************/ /******************************************************************************/
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ /* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
...@@ -192,6 +194,9 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -192,6 +194,9 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SNDTIMEO 28 #define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31 #define ZMQ_IPV4ONLY 31
/* Message options */
#define ZMQ_MORE 1
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2 #define ZMQ_SNDMORE 2
......
...@@ -485,6 +485,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -485,6 +485,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0)) if (unlikely (rc != 0))
return -1; return -1;
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
// At this point we impose the flags on the message. // At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDMORE) if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
...@@ -857,15 +860,10 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) ...@@ -857,15 +860,10 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_) void zmq::socket_base_t::extract_flags (msg_t *msg_)
{ {
// Test whether IDENTITY flag is valid for this socket type. // Test whether IDENTITY flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::identity)) { if (unlikely (msg_->flags () & msg_t::identity))
zmq_assert (options.recv_identity); zmq_assert (options.recv_identity);
printf ("identity recvd\n");
}
// Remove MORE flag. // Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false; rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
} }
...@@ -340,6 +340,25 @@ size_t zmq_msg_size (zmq_msg_t *msg_) ...@@ -340,6 +340,25 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
return ((zmq::msg_t*) msg_)->size (); return ((zmq::msg_t*) msg_)->size ();
} }
int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
size_t *optvallen_)
{
switch (option_) {
case ZMQ_MORE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) =
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
*optvallen_ = sizeof (int);
return 0;
default:
errno = EINVAL;
return -1;
}
}
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{ {
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
......
...@@ -8,7 +8,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -8,7 +8,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_hwm \ test_hwm \
test_reqrep_device \ test_reqrep_device \
test_sub_forward \ test_sub_forward \
test_invalid_rep test_invalid_rep \
test_msg_flags
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -25,6 +26,7 @@ test_hwm_SOURCES = test_hwm.cpp ...@@ -25,6 +26,7 @@ test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp test_reqrep_device_SOURCES = test_reqrep_device.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
/*
Copyright (c) 2011 250bpm s.r.o.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
// Create the infrastructure
void *ctx = zmq_init (0);
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_XREP);
assert (sb);
int rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_XREQ);
assert (sc);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
// Send 2-part message.
rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (sc, "B", 1, 0);
assert (rc == 1);
// Identity comes first.
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recvmsg (sb, &msg, 0);
assert (rc >= 0);
int more;
size_t more_size = sizeof (more);
rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
assert (rc == 0);
assert (more == 1);
// Then the first part of the message body.
rc = zmq_recvmsg (sb, &msg, 0);
assert (rc == 1);
more_size = sizeof (more);
rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
assert (rc == 0);
assert (more == 1);
// And finally, the second part of the message body.
rc = zmq_recvmsg (sb, &msg, 0);
assert (rc == 1);
more_size = sizeof (more);
rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
assert (rc == 0);
assert (more == 0);
// Deallocate the infrastructure.
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment