Commit 531c6af0 authored by Martin Sustrik's avatar Martin Sustrik

message flags added to zmq_msg_t strcuture

parent 96ccc1c5
...@@ -102,15 +102,19 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum); ...@@ -102,15 +102,19 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
#define ZMQ_DELIMITER 31 #define ZMQ_DELIMITER 31
#define ZMQ_VSM 32 #define ZMQ_VSM 32
// A message. If 'shared' is true, message content pointed to by 'content' // Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag
// is shared, i.e. reference counting is used to manage its lifetime // (it has no equivalent in the wire format), however, making it a flag
// rather than straighforward malloc/free. Not that 'content' is not a pointer // allows us to pack the stucture tigher and thus improve performance.
// to the raw data. Rather it is pointer to zmq::msg_content_t structure #define ZMQ_MSG_TBC 1
#define ZMQ_MSG_SHARED 128
// A message. Note that 'content' is not a pointer to the raw data.
// Rather it is pointer to zmq::msg_content_t structure
// (see src/msg_content.hpp for its definition). // (see src/msg_content.hpp for its definition).
typedef struct typedef struct
{ {
void *content; void *content;
unsigned char shared; unsigned char flags;
unsigned char vsm_size; unsigned char vsm_size;
unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]; unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
} zmq_msg_t; } zmq_msg_t;
......
...@@ -174,7 +174,7 @@ void zmq::writer_t::term () ...@@ -174,7 +174,7 @@ void zmq::writer_t::term ()
zmq_msg_t msg; zmq_msg_t msg;
const unsigned char *offset = 0; const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER); msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.shared = false; msg.flags = 0;
pipe->write (msg); pipe->write (msg);
pipe->flush (); pipe->flush ();
} }
......
...@@ -121,11 +121,11 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -121,11 +121,11 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// There are at least 2 destinations for the message. That means we have // There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to // to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1). // the content (we are holding one reference anyway, that's why -1).
if (msg_->shared) if (msg_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (pipes_count - 1); content->refcnt.add (pipes_count - 1);
else { else {
content->refcnt.set (pipes_count); content->refcnt.set (pipes_count);
msg_->shared = true; msg_->flags |= ZMQ_MSG_SHARED;
} }
// Push the message to all destinations. // Push the message to all destinations.
......
...@@ -96,6 +96,7 @@ const char *zmq_strerror (int errnum_) ...@@ -96,6 +96,7 @@ const char *zmq_strerror (int errnum_)
int zmq_msg_init (zmq_msg_t *msg_) int zmq_msg_init (zmq_msg_t *msg_)
{ {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->flags = 0;
msg_->vsm_size = 0; msg_->vsm_size = 0;
return 0; return 0;
} }
...@@ -104,6 +105,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) ...@@ -104,6 +105,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{ {
if (size_ <= ZMQ_MAX_VSM_SIZE) { if (size_ <= ZMQ_MAX_VSM_SIZE) {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM; msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->flags = 0;
msg_->vsm_size = (uint8_t) size_; msg_->vsm_size = (uint8_t) size_;
} }
else { else {
...@@ -113,8 +115,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) ...@@ -113,8 +115,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
msg_->shared = 0; msg_->flags = 0;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = (void*) (content + 1); content->data = (void*) (content + 1);
content->size = size_; content->size = size_;
...@@ -128,9 +130,9 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) ...@@ -128,9 +130,9 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_) zmq_free_fn *ffn_, void *hint_)
{ {
msg_->shared = 0;
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
zmq_assert (msg_->content); zmq_assert (msg_->content);
msg_->flags = 0;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = data_; content->data = data_;
content->size = size_; content->size = size_;
...@@ -150,7 +152,7 @@ int zmq_msg_close (zmq_msg_t *msg_) ...@@ -150,7 +152,7 @@ int zmq_msg_close (zmq_msg_t *msg_)
// If the content is not shared, or if it is shared and the reference. // If the content is not shared, or if it is shared and the reference.
// count has dropped to zero, deallocate it. // count has dropped to zero, deallocate it.
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
if (!msg_->shared || !content->refcnt.sub (1)) { if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference. // We used "placement new" operator to initialize the reference.
// counter so we call its destructor now. // counter so we call its destructor now.
...@@ -183,10 +185,10 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) ...@@ -183,10 +185,10 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
// One reference is added to shared messages. Non-shared messages // One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2. // are turned into shared messages and reference count is set to 2.
zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
if (src_->shared) if (src_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (1); content->refcnt.add (1);
else { else {
src_->shared = true; src_->flags |= ZMQ_MSG_SHARED;
content->refcnt.set (2); content->refcnt.set (2);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment