Commit 51cb57e2 authored by Jens Auer's avatar Jens Auer

Fixed wrong handling of shared messages

The shared reference count was not shared but copied. msg_t cannot
store the refcnt itsef but has to store a pointer to an externally
allocated (shared) refcnter. The changes to lmsg are reverted to
use content_t again. Howver, this introduces an allocation in v2_decoder
when creating the message which can be avoided. When allocating the reception
buffer, space is allocated for the maximum number of reference counts
(8192 / max_vsm_size = 8192/64 = 128 zmq:atomic_counter objects). This
increases the buffer by 128*sizeof(atomic_counter) = 128*4 = 512 bytes only.
When creating a message, the refcnt member is set to the address of one of the
pre-allocated atomic_counter_t objects. To do so, a new msg_t type zcmsg
is introduced because msg::copy must discriminate between the message types
when releasing memory.
parent dfe19080
...@@ -643,7 +643,7 @@ check_PROGRAMS = ${test_apps} ...@@ -643,7 +643,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases # Run the test cases
TESTS = $(test_apps) TESTS = $(test_apps)
XFAIL_TESTS = tests/test_msg_ffn XFAIL_TESTS =
if !ON_LINUX if !ON_LINUX
XFAIL_TESTS += tests/test_abstract_ipc XFAIL_TESTS += tests/test_abstract_ipc
......
...@@ -45,27 +45,35 @@ ...@@ -45,27 +45,35 @@
typedef char zmq_msg_size_check typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
// check whether the size of atomic_counter_t matches the size of the wrapped integer
// to ensure that the lsmg union is correctly aligned
typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::atomic_counter_t) == sizeof (zmq::atomic_counter_t::integer_t)) != 0) - 1];
bool zmq::msg_t::check () bool zmq::msg_t::check ()
{ {
return u.base.type >= type_min && u.base.type <= type_max; return u.base.type >= type_min && u.base.type <= type_max;
} }
int zmq::msg_t::init (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_) int zmq::msg_t::init (void* data_, size_t size_,
msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_)
{ {
if (size_ <= max_vsm_size) if (size_ < max_vsm_size) {
int const rc = init_size(size_);
if (rc != -1)
{ {
int rc = init_size(size_);
memcpy(data(), data_, size_); memcpy(data(), data_, size_);
return rc; return 0;
}
else
{
return -1;
}
}
else if(refcnt_)
{
return init_external_storage(data_, size_, refcnt_, ffn_, hint);
} }
else else
{ {
return init_data(data_, size_, ffn_, hint_); return init_data(data_, size_, ffn_, hint);
} }
} }
...@@ -95,22 +103,47 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -95,22 +103,47 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.data = malloc(size_); u.lmsg.content =
if (unlikely (!u.lmsg.data)) { (content_t*) malloc (sizeof (content_t) + size_);
if (unlikely (!u.lmsg.content)) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
u.lmsg.size = size_; u.lmsg.content->data = u.lmsg.content + 1;
u.lmsg.ffn = NULL; u.lmsg.content->size = size_;
u.lmsg.hint = NULL; u.lmsg.content->ffn = NULL;
new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t (); u.lmsg.content->hint = NULL;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
} }
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr,
void *hint_) msg_free_fn *ffn_, void *hint_)
{
zmq_assert(NULL != data_);
zmq_assert(NULL != ctr);
file_desc = -1;
u.zclmsg.metadata = NULL;
u.zclmsg.type = type_zclmsg;
u.zclmsg.flags = 0;
u.zclmsg.routing_id = 0;
u.zclmsg.data = data_;
u.zclmsg.size = size_;
u.zclmsg.ffn = ffn_;
u.zclmsg.hint = hint_;
u.zclmsg.refcnt = ctr;
new (u.zclmsg.refcnt) zmq::atomic_counter_t();
return 0;
}
int zmq::msg_t::init_data (void *data_, size_t size_,
msg_free_fn *ffn_, void *hint_)
{ {
// If data is NULL and size is not 0, a segfault // If data is NULL and size is not 0, a segfault
// would occur once the data is accessed // would occur once the data is accessed
...@@ -132,12 +165,17 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, ...@@ -132,12 +165,17 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
u.lmsg.data = data_; u.lmsg.content->data = data_;
u.lmsg.size = size_; u.lmsg.content->size = size_;
u.lmsg.ffn = ffn_; u.lmsg.content->ffn = ffn_;
u.lmsg.hint = hint_; u.lmsg.content->hint = hint_;
new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -152,13 +190,6 @@ int zmq::msg_t::init_delimiter () ...@@ -152,13 +190,6 @@ int zmq::msg_t::init_delimiter ()
return 0; return 0;
} }
zmq::atomic_counter_t& zmq::msg_t::msg_counter()
{
zmq_assert( is_lmsg() );
void* ptr = static_cast<void*>( &u.lmsg.refcnt.counter );
return *static_cast<atomic_counter_t*>( ptr );
}
int zmq::msg_t::close () int zmq::msg_t::close ()
{ {
// Check the validity of the message. // Check the validity of the message.
...@@ -172,14 +203,34 @@ int zmq::msg_t::close () ...@@ -172,14 +203,34 @@ int zmq::msg_t::close ()
// If the content is not shared, or if it is shared and the reference // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it. // count has dropped to zero, deallocate it.
if (!(u.lmsg.flags & msg_t::shared) || if (!(u.lmsg.flags & msg_t::shared) ||
!msg_counter().sub (1)) { !u.lmsg.content->refcnt.sub (1)) {
if (u.lmsg.ffn) { // We used "placement new" operator to initialize the reference
u.lmsg.ffn(u.lmsg.data, u.lmsg.hint); // counter so we call the destructor explicitly now.
u.lmsg.content->refcnt.~atomic_counter_t ();
if (u.lmsg.content->ffn)
u.lmsg.content->ffn (u.lmsg.content->data,
u.lmsg.content->hint);
free (u.lmsg.content);
} }
else {
free (u.lmsg.data);
} }
if (is_zcmsg())
{
zmq_assert( u.zclmsg.ffn );
// If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
if (!(u.zclmsg.flags & msg_t::shared) ||
!u.zclmsg.refcnt->sub (1)) {
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u.zclmsg.refcnt->~atomic_counter_t ();
u.zclmsg.ffn (u.zclmsg.data,
u.zclmsg.hint);
} }
} }
...@@ -226,18 +277,29 @@ int zmq::msg_t::copy (msg_t &src_) ...@@ -226,18 +277,29 @@ int zmq::msg_t::copy (msg_t &src_)
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return rc; return rc;
if (src_.u.base.type == type_lmsg) { if (src_.u.base.type == type_lmsg ) {
// One reference is added to shared messages. Non-shared messages // One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2. // are turned into shared messages and reference count is set to 2.
if (src_.u.lmsg.flags & msg_t::shared) if (src_.u.lmsg.flags & msg_t::shared)
src_.msg_counter().add (1); src_.u.lmsg.content->refcnt.add (1);
else { else {
src_.u.lmsg.flags |= msg_t::shared; src_.u.lmsg.flags |= msg_t::shared;
src_.msg_counter().set (2); src_.u.lmsg.content->refcnt.set (2);
} }
} }
if (src_.is_zcmsg()) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
if (src_.u.zclmsg.flags & msg_t::shared)
src_.refcnt()->add (1);
else {
src_.u.zclmsg.flags |= msg_t::shared;
src_.refcnt()->set (2);
}
}
if (src_.u.base.metadata != NULL) if (src_.u.base.metadata != NULL)
src_.u.base.metadata->add_ref (); src_.u.base.metadata->add_ref ();
...@@ -256,9 +318,11 @@ void *zmq::msg_t::data () ...@@ -256,9 +318,11 @@ void *zmq::msg_t::data ()
case type_vsm: case type_vsm:
return u.vsm.data; return u.vsm.data;
case type_lmsg: case type_lmsg:
return u.lmsg.data; return u.lmsg.content->data;
case type_cmsg: case type_cmsg:
return u.cmsg.data; return u.cmsg.data;
case type_zclmsg:
return u.zclmsg.data;
default: default:
zmq_assert (false); zmq_assert (false);
return NULL; return NULL;
...@@ -274,7 +338,9 @@ size_t zmq::msg_t::size () ...@@ -274,7 +338,9 @@ size_t zmq::msg_t::size ()
case type_vsm: case type_vsm:
return u.vsm.size; return u.vsm.size;
case type_lmsg: case type_lmsg:
return u.lmsg.size; return u.lmsg.content->size;
case type_zclmsg:
return u.zclmsg.size;
case type_cmsg: case type_cmsg:
return u.cmsg.size; return u.cmsg.size;
default: default:
...@@ -345,19 +411,19 @@ bool zmq::msg_t::is_delimiter () const ...@@ -345,19 +411,19 @@ bool zmq::msg_t::is_delimiter () const
return u.base.type == type_delimiter; return u.base.type == type_delimiter;
} }
bool zmq::msg_t::is_vsm () bool zmq::msg_t::is_vsm () const
{ {
return u.base.type == type_vsm; return u.base.type == type_vsm;
} }
bool zmq::msg_t::is_lmsg () const bool zmq::msg_t::is_cmsg () const
{ {
return u.base.type == type_lmsg; return u.base.type == type_cmsg;
} }
bool zmq::msg_t::is_cmsg () bool zmq::msg_t::is_zcmsg() const
{ {
return u.base.type == type_cmsg; return u.base.type == type_zclmsg;
} }
void zmq::msg_t::add_refs (int refs_) void zmq::msg_t::add_refs (int refs_)
...@@ -373,12 +439,12 @@ void zmq::msg_t::add_refs (int refs_) ...@@ -373,12 +439,12 @@ void zmq::msg_t::add_refs (int refs_)
// VSMs, CMSGS and delimiters can be copied straight away. The only // VSMs, CMSGS and delimiters can be copied straight away. The only
// message type that needs special care are long messages. // message type that needs special care are long messages.
if (u.base.type == type_lmsg) { if (u.base.type == type_lmsg || is_zcmsg() ) {
if (u.lmsg.flags & msg_t::shared) if (u.base.flags & msg_t::shared)
msg_counter().add (refs_); refcnt()->add (refs_);
else { else {
msg_counter().set (refs_ + 1); refcnt()->set (refs_ + 1);
u.lmsg.flags |= msg_t::shared; u.base.flags |= msg_t::shared;
} }
} }
} }
...@@ -395,20 +461,29 @@ bool zmq::msg_t::rm_refs (int refs_) ...@@ -395,20 +461,29 @@ bool zmq::msg_t::rm_refs (int refs_)
return true; return true;
// If there's only one reference close the message. // If there's only one reference close the message.
if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) { if ( (u.base.type != type_zclmsg && u.base.type != type_lmsg) || !(u.base.flags & msg_t::shared)) {
close (); close ();
return false; return false;
} }
// The only message type that needs special care are long messages. // The only message type that needs special care are long and zcopy messages.
if (!msg_counter().sub (refs_)) { if (!u.lmsg.content->refcnt.sub (refs_)) {
// We used "placement new" operator to initialize the reference // We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now. // counter so we call the destructor explicitly now.
msg_counter().~atomic_counter_t (); u.lmsg.content->refcnt.~atomic_counter_t ();
if (u.lmsg.content->ffn)
u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint);
free (u.lmsg.content);
if (u.lmsg.ffn) return false;
u.lmsg.ffn (u.lmsg.data, u.lmsg.hint); }
free (u.lmsg.data);
if (!u.zclmsg.refcnt->sub (refs_)) {
// storage for rfcnt is provided externally
if (u.zclmsg.ffn) {
u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint);
}
return false; return false;
} }
...@@ -427,3 +502,16 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_) ...@@ -427,3 +502,16 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_)
return 0; return 0;
} }
zmq::atomic_counter_t* zmq::msg_t::refcnt()
{
switch(u.base.type)
{
case type_lmsg:
return &u.lmsg.content->refcnt;
case type_zclmsg:
return u.zclmsg.refcnt;
default:
zmq_assert(false);
return NULL;
}
}
...@@ -54,6 +54,7 @@ namespace zmq ...@@ -54,6 +54,7 @@ namespace zmq
class msg_t class msg_t
{ {
public: public:
// Message flags. // Message flags.
enum enum
{ {
...@@ -65,12 +66,17 @@ namespace zmq ...@@ -65,12 +66,17 @@ namespace zmq
}; };
bool check (); bool check ();
int init (void *data_, size_t size_, msg_free_fn *ffn_, int init();
void *hint_);
int init (); int init (void* data, size_t size_,
msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_ = NULL);
int init_size (size_t size_); int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_, int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_); void *hint_);
int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr,
msg_free_fn *ffn_, void *hint_);
int init_delimiter (); int init_delimiter ();
int close (); int close ();
int move (msg_t &src_); int move (msg_t &src_);
...@@ -88,9 +94,9 @@ namespace zmq ...@@ -88,9 +94,9 @@ namespace zmq
bool is_identity () const; bool is_identity () const;
bool is_credential () const; bool is_credential () const;
bool is_delimiter () const; bool is_delimiter () const;
bool is_vsm (); bool is_vsm () const;
bool is_lmsg () const; bool is_cmsg () const;
bool is_cmsg (); bool is_zcmsg() const;
uint32_t get_routing_id(); uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_); int set_routing_id(uint32_t routing_id_);
...@@ -102,13 +108,30 @@ namespace zmq ...@@ -102,13 +108,30 @@ namespace zmq
// references drops to 0, the message is closed and false is returned. // references drops to 0, the message is closed and false is returned.
bool rm_refs (int refs_); bool rm_refs (int refs_);
private:
// Size in bytes of the largest message that is still copied around // Size in bytes of the largest message that is still copied around
// rather than being reference-counted. // rather than being reference-counted.
enum { msg_t_size = 64 }; enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };
private:
zmq::atomic_counter_t* refcnt();
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Different message types. // Different message types.
enum type_t enum type_t
{ {
...@@ -121,10 +144,12 @@ namespace zmq ...@@ -121,10 +144,12 @@ namespace zmq
type_delimiter = 103, type_delimiter = 103,
// CMSG messages point to constant data // CMSG messages point to constant data
type_cmsg = 104, type_cmsg = 104,
type_max = 104
};
atomic_counter_t& msg_counter(); // zero-copy LMSG message for v2_decoder
type_zclmsg = 105,
type_max = 105
};
// the file descriptor where this message originated, needs to be 64bit due to alignment // the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc; int64_t file_desc;
...@@ -149,36 +174,36 @@ namespace zmq ...@@ -149,36 +174,36 @@ namespace zmq
unsigned char flags; unsigned char flags;
uint32_t routing_id; uint32_t routing_id;
} vsm; } vsm;
struct lmsg_t { struct {
metadata_t *metadata;
content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *)
+ sizeof (content_t*)
+ 2
+ sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} lmsg;
struct {
metadata_t *metadata; metadata_t *metadata;
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
void *data; void *data;
size_t size; size_t size;
msg_free_fn *ffn; msg_free_fn *ffn;
void *hint; void *hint;
// create an aligned block for an atomic_counter_t object zmq::atomic_counter_t* refcnt;
union aligned_atomic_counter_storage {
zmq::atomic_counter_t::integer_t maxAlign;
unsigned char counter[ sizeof(zmq::atomic_counter_t) ];
} refcnt;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *)
+ sizeof(void*) + sizeof (void*)
+ sizeof(size_t) + sizeof (size_t)
+ sizeof(msg_free_fn*) + sizeof (msg_free_fn*)
+ sizeof(void*) + sizeof (void*)
+ sizeof(aligned_atomic_counter_storage) + sizeof (zmq::atomic_counter_t*)
+ 2 + 2
+ sizeof(uint32_t))]; + sizeof(uint32_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
uint32_t routing_id; uint32_t routing_id;
} lmsg; } zclmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
void* data; void* data;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <cmath>
#include "platform.hpp" #include "platform.hpp"
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
...@@ -44,7 +45,8 @@ ...@@ -44,7 +45,8 @@
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
buf(NULL), buf(NULL),
bufsize( 0 ), bufsize( 0 ),
maxsize( bufsize_ ) maxsize( bufsize_ ),
msg_refcnt( NULL )
{ {
} }
...@@ -67,9 +69,14 @@ unsigned char* zmq::shared_message_memory_allocator::allocate() ...@@ -67,9 +69,14 @@ unsigned char* zmq::shared_message_memory_allocator::allocate()
// @todo aligmnet padding may be needed // @todo aligmnet padding may be needed
if (!buf) if (!buf)
{ {
buf = (unsigned char *) malloc(bufsize + sizeof(zmq::atomic_counter_t)); // allocate memory for reference counters together with reception buffer
size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size);
size_t const allocationsize = maxsize + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t);
buf = (unsigned char *) malloc(allocationsize);
alloc_assert (buf); alloc_assert (buf);
new(buf) atomic_counter_t(1); new(buf) atomic_counter_t(1);
msg_refcnt = reinterpret_cast<zmq::atomic_counter_t*>( buf + sizeof(atomic_counter_t) + maxsize );
} }
bufsize = maxsize; bufsize = maxsize;
...@@ -81,6 +88,7 @@ void zmq::shared_message_memory_allocator::deallocate() ...@@ -81,6 +88,7 @@ void zmq::shared_message_memory_allocator::deallocate()
free(buf); free(buf);
buf = NULL; buf = NULL;
bufsize = 0; bufsize = 0;
msg_refcnt = NULL;
} }
unsigned char* zmq::shared_message_memory_allocator::release() unsigned char* zmq::shared_message_memory_allocator::release()
...@@ -88,16 +96,11 @@ unsigned char* zmq::shared_message_memory_allocator::release() ...@@ -88,16 +96,11 @@ unsigned char* zmq::shared_message_memory_allocator::release()
unsigned char* b = buf; unsigned char* b = buf;
buf = NULL; buf = NULL;
bufsize = 0; bufsize = 0;
msg_refcnt = NULL;
return b; return b;
} }
void zmq::shared_message_memory_allocator::reset(unsigned char* b)
{
deallocate();
buf = b;
}
void zmq::shared_message_memory_allocator::inc_ref() void zmq::shared_message_memory_allocator::inc_ref()
{ {
((zmq::atomic_counter_t*)buf)->add(1); ((zmq::atomic_counter_t*)buf)->add(1);
...@@ -204,6 +207,7 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p ...@@ -204,6 +207,7 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
// the current message can exceed the current buffer. We have to copy the buffer // the current message can exceed the current buffer. We have to copy the buffer
// data into a new message and complete it in the next receive. // data into a new message and complete it in the next receive.
if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size()))) if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
{ {
// a new message has started, but the size would exceed the pre-allocated arena // a new message has started, but the size would exceed the pre-allocated arena
...@@ -214,12 +218,13 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p ...@@ -214,12 +218,13 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
{ {
// construct message using n bytes from the buffer as storage // construct message using n bytes from the buffer as storage
// increase buffer ref count // increase buffer ref count
rc = in_progress.init( (unsigned char*)read_pos, // if the message will be a large message, pass a valid refcnt memory location as well
msg_size, shared_message_memory_allocator::call_dec_ref, rc = in_progress.init( (unsigned char*)read_pos, msg_size,
buffer() ); shared_message_memory_allocator::call_dec_ref, buffer(),
create_refcnt() );
// For small messages, data has been copied and refcount does not have to be increased // For small messages, data has been copied and refcount does not have to be increased
if (in_progress.is_lmsg()) if (in_progress.is_zcmsg())
{ {
inc_ref(); inc_ref();
} }
......
...@@ -63,8 +63,6 @@ namespace zmq ...@@ -63,8 +63,6 @@ namespace zmq
// the messages constructed on top of it. // the messages constructed on top of it.
unsigned char* release(); unsigned char* release();
void reset(unsigned char* b);
void inc_ref(); void inc_ref();
static void call_dec_ref(void*, void* buffer); static void call_dec_ref(void*, void* buffer);
...@@ -85,6 +83,12 @@ namespace zmq ...@@ -85,6 +83,12 @@ namespace zmq
bufsize = new_size; bufsize = new_size;
} }
//
zmq::atomic_counter_t* create_refcnt()
{
return msg_refcnt++;
}
private: private:
unsigned char* buf; unsigned char* buf;
size_t bufsize; size_t bufsize;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment