diff --git a/src/decoder_allocators.cpp b/src/decoder_allocators.cpp index 2359f015f1028190c5e9ec1997d1699b6f31732f..8014bc5168939763e89f4f2010117157c856e042 100644 --- a/src/decoder_allocators.cpp +++ b/src/decoder_allocators.cpp @@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters (static_cast <size_t> (std::ceil (static_cast <double> (max_size) / static_cast <double> (msg_t::max_vsm_size)))) { } @@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size buf(NULL), bufsize(0), max_size(bufsize_), - msg_refcnt(NULL), + msg_content(NULL), maxCounters(maxMessages) { } @@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () // allocate memory for reference counters together with reception buffer std::size_t const allocationsize = max_size + sizeof (zmq::atomic_counter_t) + - maxCounters * sizeof (zmq::atomic_counter_t); + maxCounters * sizeof (zmq::msg_t::content_t); buf = static_cast <unsigned char *> (std::malloc (allocationsize)); alloc_assert (buf); @@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () } bufsize = max_size; - msg_refcnt = reinterpret_cast <zmq::atomic_counter_t*> (buf + sizeof (atomic_counter_t) + max_size); + msg_content = reinterpret_cast <zmq::msg_t::content_t*> (buf + sizeof (atomic_counter_t) + max_size); return buf + sizeof (zmq::atomic_counter_t); } @@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release () unsigned char* b = buf; buf = NULL; bufsize = 0; - msg_refcnt = NULL; + msg_content = NULL; return b; } diff --git a/src/decoder_allocators.hpp b/src/decoder_allocators.hpp index 380452e8e2c9e7d0676ba2c1369b01011fa4620c..30cfaf1aa52d0038aaac7e1f411de867a239862a 100644 --- a/src/decoder_allocators.hpp +++ b/src/decoder_allocators.hpp @@ -34,6 +34,7 @@ #include <cstdlib> #include "atomic_counter.hpp" +#include "msg.hpp" #include "err.hpp" namespace zmq @@ -132,21 +133,21 @@ namespace zmq bufsize = new_size; } - zmq::atomic_counter_t* provide_refcnt () + zmq::msg_t::content_t* provide_content () { - return msg_refcnt; + return msg_content; } - void advance_refcnt () + void advance_content () { - msg_refcnt++; + msg_content++; } private: unsigned char* buf; std::size_t bufsize; std::size_t max_size; - zmq::atomic_counter_t* msg_refcnt; + zmq::msg_t::content_t* msg_content; std::size_t maxCounters; }; } diff --git a/src/msg.cpp b/src/msg.cpp index 9ddabaf8ccbf5a72f61c68c2bf5dd2c13c054d9b..37379fcb61f874c4e408f768d7ea7e8ace97ac4b 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -53,7 +53,7 @@ bool zmq::msg_t::check () int zmq::msg_t::init (void* data_, size_t size_, msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_) + content_t* content_) { if (size_ < max_vsm_size) { int const rc = init_size(size_); @@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_, return -1; } } - else if(refcnt_) + else if(content_) { - return init_external_storage(data_, size_, refcnt_, ffn_, hint); + return init_external_storage(content_, data_, size_, ffn_, hint); } else { @@ -122,12 +122,11 @@ int zmq::msg_t::init_size (size_t size_) return 0; } -int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, - msg_free_fn *ffn_, void *hint_) +int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_, + msg_free_fn *ffn_, void* hint_) { zmq_assert(NULL != data_); - zmq_assert(NULL != ctr); - + zmq_assert(NULL != content_); u.zclmsg.metadata = NULL; u.zclmsg.type = type_zclmsg; @@ -135,12 +134,12 @@ int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_cou u.zclmsg.routing_id = 0; u.zclmsg.fd = retired_fd; - u.zclmsg.data = data_; - u.zclmsg.size = size_; - u.zclmsg.ffn = ffn_; - u.zclmsg.hint = hint_; - u.zclmsg.refcnt = ctr; - new (u.zclmsg.refcnt) zmq::atomic_counter_t(); + u.zclmsg.content = content_; + u.zclmsg.content->data = data_; + u.zclmsg.content->size = size_; + u.zclmsg.content->ffn = ffn_; + u.zclmsg.content->hint = hint_; + new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t(); return 0; } @@ -222,19 +221,19 @@ int zmq::msg_t::close () if (is_zcmsg()) { - zmq_assert( u.zclmsg.ffn ); + zmq_assert( u.zclmsg.content->ffn ); // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. if (!(u.zclmsg.flags & msg_t::shared) || - !u.zclmsg.refcnt->sub (1)) { + !u.zclmsg.content->refcnt.sub (1)) { // We used "placement new" operator to initialize the reference // counter so we call the destructor explicitly now. - u.zclmsg.refcnt->~atomic_counter_t (); + u.zclmsg.content->refcnt.~atomic_counter_t (); - u.zclmsg.ffn (u.zclmsg.data, - u.zclmsg.hint); + u.zclmsg.content->ffn (u.zclmsg.content->data, + u.zclmsg.content->hint); } } @@ -329,7 +328,7 @@ void *zmq::msg_t::data () case type_cmsg: return u.cmsg.data; case type_zclmsg: - return u.zclmsg.data; + return u.zclmsg.content->data; default: zmq_assert (false); return NULL; @@ -347,7 +346,7 @@ size_t zmq::msg_t::size () case type_lmsg: return u.lmsg.content->size; case type_zclmsg: - return u.zclmsg.size; + return u.zclmsg.content->size; case type_cmsg: return u.cmsg.size; default: @@ -487,10 +486,10 @@ bool zmq::msg_t::rm_refs (int refs_) return false; } - if (is_zcmsg() && !u.zclmsg.refcnt->sub(refs_)) { + if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) { // storage for rfcnt is provided externally - if (u.zclmsg.ffn) { - u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint); + if (u.zclmsg.content->ffn) { + u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint); } return false; @@ -527,7 +526,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt() case type_lmsg: return &u.lmsg.content->refcnt; case type_zclmsg: - return u.zclmsg.refcnt; + return &u.zclmsg.content->refcnt; default: zmq_assert(false); return NULL; diff --git a/src/msg.hpp b/src/msg.hpp index e420b3379c8ab1e9f93a66b69c197f19ae1657f3..4a14ad82613476baa6e54a0626e90d6a8569589e 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -56,6 +56,22 @@ namespace zmq { public: + // Shared message buffer. Message data are either allocated in one + // continuous block along with this structure - thus avoiding one + // malloc/free pair or they are stored in used-supplied memory. + // In the latter case, ffn member stores pointer to the function to be + // used to deallocate the data. If the buffer is actually shared (there + // are at least 2 references to it) refcount member contains number of + // references. + struct content_t + { + void *data; + size_t size; + msg_free_fn *ffn; + void *hint; + zmq::atomic_counter_t refcnt; + }; + // Message flags. enum { @@ -71,12 +87,12 @@ namespace zmq int init (void* data, size_t size_, msg_free_fn* ffn_, void* hint, - zmq::atomic_counter_t* refcnt_ = NULL); + content_t* content_ = NULL); int init_size (size_t size_); int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); - int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, + int init_external_storage(content_t* content_, void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); int init_delimiter (); int close (); @@ -121,22 +137,6 @@ namespace zmq private: zmq::atomic_counter_t* refcnt(); - // Shared message buffer. Message data are either allocated in one - // continuous block along with this structure - thus avoiding one - // malloc/free pair or they are stored in used-supplied memory. - // In the latter case, ffn member stores pointer to the function to be - // used to deallocate the data. If the buffer is actually shared (there - // are at least 2 references to it) refcount member contains number of - // references. - struct content_t - { - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t refcnt; - }; - // Different message types. enum type_t { @@ -196,17 +196,9 @@ namespace zmq } lmsg; struct { metadata_t *metadata; - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t* refcnt; + content_t *content; unsigned char unused [msg_t_size - (sizeof (metadata_t *) + - sizeof (void*) + - sizeof (size_t) + - sizeof (msg_free_fn*) + - sizeof (void*) + - sizeof (zmq::atomic_counter_t*) + + sizeof (content_t*) + 2 + sizeof (uint32_t) + sizeof (fd_t))]; diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index 53b9a3302541908a5778b6c3fe5489c21647e775..633530ef49137e67019267cad17aeaddc35c1300 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, { int rc = in_progress.init ((unsigned char*)data_, size_, shared_message_memory_allocator::call_dec_ref, - allocator.buffer(), - allocator.provide_refcnt() ); + allocator.buffer (), + allocator.provide_content ()); // if the buffer serves as memory for a zero-copy message, release it // and allocate a new buffer in get_buffer for the next decode - if (in_progress.is_zcmsg()) { - allocator.advance_refcnt(); + if (in_progress.is_zcmsg ()) { + allocator.advance_content(); allocator.release(); } diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 09154144a10ff71c5d744a3a86b26570aedf70d3..6af74fd28d39b669e403468e50161f013dfc5a46 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p // if the message will be a large message, pass a valid refcnt memory location as well rc = in_progress.init ((unsigned char *) read_pos, static_cast <size_t> (msg_size), shared_message_memory_allocator::call_dec_ref, buffer(), - provide_refcnt ()); + provide_content ()); // For small messages, data has been copied and refcount does not have to be increased if (in_progress.is_zcmsg()) { - advance_refcnt(); + advance_content(); inc_ref(); } }