Commit 03f097a5 authored by Thomas Rodgers's avatar Thomas Rodgers

Update zmq_msg_get(ZMQ_SHARED) to return true for type_cmsg messages

parent 416ee8e7
...@@ -24,14 +24,14 @@ The following properties can be retrieved with the _zmq_msg_get()_ function: ...@@ -24,14 +24,14 @@ The following properties can be retrieved with the _zmq_msg_get()_ function:
Indicates that there are more message frames to follow after the 'message'. Indicates that there are more message frames to follow after the 'message'.
*ZMQ_SRCFD*:: *ZMQ_SRCFD*::
Returns the file descriptor of the socket the 'message' was read from. This Returns the file descriptor of the socket the 'message' was read from. This
allows application to retrieve the remote endpoint via 'getpeername(2)'. Be allows application to retrieve the remote endpoint via 'getpeername(2)'. Be
aware that the respective socket might be closed already, reused even. aware that the respective socket might be closed already, reused even.
Currently only implemented for TCP sockets. Currently only implemented for TCP sockets.
*ZMQ_SHARED*:: *ZMQ_SHARED*::
Indicates that a message has been copied and MAY share underlying storage Indicates that a message MAY share underlying storage with another copy of
with another copy of this message. this message.
RETURN VALUE RETURN VALUE
------------ ------------
......
...@@ -214,7 +214,7 @@ void *zmq_init (int io_threads_) ...@@ -214,7 +214,7 @@ void *zmq_init (int io_threads_)
return ctx; return ctx;
} }
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
int zmq_term (void *ctx_) int zmq_term (void *ctx_)
...@@ -366,7 +366,7 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -366,7 +366,7 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
errno = err; errno = err;
return -1; return -1;
} }
// Note the optimisation here. We don't close the msg object as it is // Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes. // empty anyway. This may change when implementation of zmq_msg_t changes.
return rc; return rc;
...@@ -392,7 +392,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -392,7 +392,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
errno = err; errno = err;
return -1; return -1;
} }
// Note the optimisation here. We don't close the msg object as it is // Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes. // empty anyway. This may change when implementation of zmq_msg_t changes.
return rc; return rc;
...@@ -415,7 +415,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) ...@@ -415,7 +415,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
int rc = 0; int rc = 0;
zmq_msg_t msg; zmq_msg_t msg;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
for (size_t i = 0; i < count_; ++i) { for (size_t i = 0; i < count_; ++i) {
rc = zmq_msg_init_size (&msg, a_[i].iov_len); rc = zmq_msg_init_size (&msg, a_[i].iov_len);
if (rc != 0) { if (rc != 0) {
...@@ -435,7 +435,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) ...@@ -435,7 +435,7 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
break; break;
} }
} }
return rc; return rc;
} }
// Receiving functions. // Receiving functions.
...@@ -488,7 +488,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ...@@ -488,7 +488,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
} }
// Receive a multi-part message // Receive a multi-part message
// //
// Receives up to *count_ parts of a multi-part message. // Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read. // Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read. // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
...@@ -499,7 +499,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ...@@ -499,7 +499,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
// *count_ to retrieve message parts successfully read, // *count_ to retrieve message parts successfully read,
// even if -1 is returned. // even if -1 is returned.
// //
// The iov_base* buffers of each iovec *a_ filled in by this // The iov_base* buffers of each iovec *a_ filled in by this
// function may be freed using free(). // function may be freed using free().
// TODO: this function has no man page // TODO: this function has no man page
// //
...@@ -514,11 +514,11 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -514,11 +514,11 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
size_t count = *count_; size_t count = *count_;
int nread = 0; int nread = 0;
bool recvmore = true; bool recvmore = true;
*count_ = 0; *count_ = 0;
for (size_t i = 0; recvmore && i < count; ++i) { for (size_t i = 0; recvmore && i < count; ++i) {
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init (&msg); int rc = zmq_msg_init (&msg);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -630,7 +630,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int property_) ...@@ -630,7 +630,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int property_)
// warning: int64_t to int // warning: int64_t to int
return ((zmq::msg_t*) msg_)->fd (); return ((zmq::msg_t*) msg_)->fd ();
case ZMQ_SHARED: case ZMQ_SHARED:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0; return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
default: default:
errno = EINVAL; errno = EINVAL;
return -1; return -1;
......
...@@ -25,19 +25,19 @@ int main (void) ...@@ -25,19 +25,19 @@ int main (void)
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_ROUTER); void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb); assert (sb);
int rc = zmq_bind (sb, "inproc://a"); int rc = zmq_bind (sb, "inproc://a");
assert (rc == 0); assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_DEALER); void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc); assert (sc);
rc = zmq_connect (sc, "inproc://a"); rc = zmq_connect (sc, "inproc://a");
assert (rc == 0); assert (rc == 0);
// Send 2-part message. // Send 2-part message.
rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
assert (rc == 1); assert (rc == 1);
...@@ -65,7 +65,7 @@ int main (void) ...@@ -65,7 +65,7 @@ int main (void)
more = zmq_msg_more (&msg); more = zmq_msg_more (&msg);
assert (more == 0); assert (more == 0);
// Test ZMQ_SHARED property // Test ZMQ_SHARED property (case 1, refcounted messages)
zmq_msg_t msg_a; zmq_msg_t msg_a;
rc = zmq_msg_init_size(&msg_a, 1024); // large enough to be a type_lmsg rc = zmq_msg_init_size(&msg_a, 1024); // large enough to be a type_lmsg
assert (rc == 0); assert (rc == 0);
...@@ -85,13 +85,31 @@ int main (void) ...@@ -85,13 +85,31 @@ int main (void)
rc = zmq_msg_get(&msg_b, ZMQ_SHARED); rc = zmq_msg_get(&msg_b, ZMQ_SHARED);
assert (rc == 1); assert (rc == 1);
// cleanup
rc = zmq_msg_close(&msg_a);
assert (rc == 0);
rc = zmq_msg_close(&msg_b);
assert (rc == 0);
// Test ZMQ_SHARED property (case 2, constant data messages)
rc = zmq_msg_init_data(&msg_a, (void*) "TEST", 5, 0, 0);
assert (rc == 0);
// Message reports as shared
rc = zmq_msg_get(&msg_a, ZMQ_SHARED);
assert (rc == 1);
// cleanup
rc = zmq_msg_close(&msg_a);
assert (rc == 0);
// Deallocate the infrastructure. // Deallocate the infrastructure.
rc = zmq_close (sc); rc = zmq_close (sc);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (sb); rc = zmq_close (sb);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
return 0 ; return 0 ;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment