Commit b2718149 authored by somdoron's avatar somdoron

msg external storage is using content_t

parent 15ad6f80
......@@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size
buf(NULL),
bufsize(0),
max_size(bufsize_),
msg_refcnt(NULL),
msg_content(NULL),
maxCounters (static_cast <size_t> (std::ceil (static_cast <double> (max_size) / static_cast <double> (msg_t::max_vsm_size))))
{
}
......@@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size
buf(NULL),
bufsize(0),
max_size(bufsize_),
msg_refcnt(NULL),
msg_content(NULL),
maxCounters(maxMessages)
{
}
......@@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate ()
// allocate memory for reference counters together with reception buffer
std::size_t const allocationsize =
max_size + sizeof (zmq::atomic_counter_t) +
maxCounters * sizeof (zmq::atomic_counter_t);
maxCounters * sizeof (zmq::msg_t::content_t);
buf = static_cast <unsigned char *> (std::malloc (allocationsize));
alloc_assert (buf);
......@@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate ()
}
bufsize = max_size;
msg_refcnt = reinterpret_cast <zmq::atomic_counter_t*> (buf + sizeof (atomic_counter_t) + max_size);
msg_content = reinterpret_cast <zmq::msg_t::content_t*> (buf + sizeof (atomic_counter_t) + max_size);
return buf + sizeof (zmq::atomic_counter_t);
}
......@@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release ()
unsigned char* b = buf;
buf = NULL;
bufsize = 0;
msg_refcnt = NULL;
msg_content = NULL;
return b;
}
......
......@@ -34,6 +34,7 @@
#include <cstdlib>
#include "atomic_counter.hpp"
#include "msg.hpp"
#include "err.hpp"
namespace zmq
......@@ -132,21 +133,21 @@ namespace zmq
bufsize = new_size;
}
zmq::atomic_counter_t* provide_refcnt ()
zmq::msg_t::content_t* provide_content ()
{
return msg_refcnt;
return msg_content;
}
void advance_refcnt ()
void advance_content ()
{
msg_refcnt++;
msg_content++;
}
private:
unsigned char* buf;
std::size_t bufsize;
std::size_t max_size;
zmq::atomic_counter_t* msg_refcnt;
zmq::msg_t::content_t* msg_content;
std::size_t maxCounters;
};
}
......
......@@ -53,7 +53,7 @@ bool zmq::msg_t::check ()
int zmq::msg_t::init (void* data_, size_t size_,
msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_)
content_t* content_)
{
if (size_ < max_vsm_size) {
int const rc = init_size(size_);
......@@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_,
return -1;
}
}
else if(refcnt_)
else if(content_)
{
return init_external_storage(data_, size_, refcnt_, ffn_, hint);
return init_external_storage(content_, data_, size_, ffn_, hint);
}
else
{
......@@ -122,12 +122,11 @@ int zmq::msg_t::init_size (size_t size_)
return 0;
}
int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr,
msg_free_fn *ffn_, void *hint_)
int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
msg_free_fn *ffn_, void* hint_)
{
zmq_assert(NULL != data_);
zmq_assert(NULL != ctr);
zmq_assert(NULL != content_);
u.zclmsg.metadata = NULL;
u.zclmsg.type = type_zclmsg;
......@@ -135,12 +134,12 @@ int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_cou
u.zclmsg.routing_id = 0;
u.zclmsg.fd = retired_fd;
u.zclmsg.data = data_;
u.zclmsg.size = size_;
u.zclmsg.ffn = ffn_;
u.zclmsg.hint = hint_;
u.zclmsg.refcnt = ctr;
new (u.zclmsg.refcnt) zmq::atomic_counter_t();
u.zclmsg.content = content_;
u.zclmsg.content->data = data_;
u.zclmsg.content->size = size_;
u.zclmsg.content->ffn = ffn_;
u.zclmsg.content->hint = hint_;
new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
return 0;
}
......@@ -222,19 +221,19 @@ int zmq::msg_t::close ()
if (is_zcmsg())
{
zmq_assert( u.zclmsg.ffn );
zmq_assert( u.zclmsg.content->ffn );
// If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
if (!(u.zclmsg.flags & msg_t::shared) ||
!u.zclmsg.refcnt->sub (1)) {
!u.zclmsg.content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u.zclmsg.refcnt->~atomic_counter_t ();
u.zclmsg.content->refcnt.~atomic_counter_t ();
u.zclmsg.ffn (u.zclmsg.data,
u.zclmsg.hint);
u.zclmsg.content->ffn (u.zclmsg.content->data,
u.zclmsg.content->hint);
}
}
......@@ -329,7 +328,7 @@ void *zmq::msg_t::data ()
case type_cmsg:
return u.cmsg.data;
case type_zclmsg:
return u.zclmsg.data;
return u.zclmsg.content->data;
default:
zmq_assert (false);
return NULL;
......@@ -347,7 +346,7 @@ size_t zmq::msg_t::size ()
case type_lmsg:
return u.lmsg.content->size;
case type_zclmsg:
return u.zclmsg.size;
return u.zclmsg.content->size;
case type_cmsg:
return u.cmsg.size;
default:
......@@ -487,10 +486,10 @@ bool zmq::msg_t::rm_refs (int refs_)
return false;
}
if (is_zcmsg() && !u.zclmsg.refcnt->sub(refs_)) {
if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
// storage for rfcnt is provided externally
if (u.zclmsg.ffn) {
u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint);
if (u.zclmsg.content->ffn) {
u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
}
return false;
......@@ -527,7 +526,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt()
case type_lmsg:
return &u.lmsg.content->refcnt;
case type_zclmsg:
return u.zclmsg.refcnt;
return &u.zclmsg.content->refcnt;
default:
zmq_assert(false);
return NULL;
......
......@@ -56,6 +56,22 @@ namespace zmq
{
public:
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Message flags.
enum
{
......@@ -71,12 +87,12 @@ namespace zmq
int init (void* data, size_t size_,
msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_ = NULL);
content_t* content_ = NULL);
int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_);
int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr,
int init_external_storage(content_t* content_, void *data_, size_t size_,
msg_free_fn *ffn_, void *hint_);
int init_delimiter ();
int close ();
......@@ -121,22 +137,6 @@ namespace zmq
private:
zmq::atomic_counter_t* refcnt();
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Different message types.
enum type_t
{
......@@ -196,17 +196,9 @@ namespace zmq
} lmsg;
struct {
metadata_t *metadata;
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t* refcnt;
content_t *content;
unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
sizeof (void*) +
sizeof (size_t) +
sizeof (msg_free_fn*) +
sizeof (void*) +
sizeof (zmq::atomic_counter_t*) +
sizeof (content_t*) +
2 +
sizeof (uint32_t) +
sizeof (fd_t))];
......
......@@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_,
{
int rc = in_progress.init ((unsigned char*)data_, size_,
shared_message_memory_allocator::call_dec_ref,
allocator.buffer(),
allocator.provide_refcnt() );
allocator.buffer (),
allocator.provide_content ());
// if the buffer serves as memory for a zero-copy message, release it
// and allocate a new buffer in get_buffer for the next decode
if (in_progress.is_zcmsg()) {
allocator.advance_refcnt();
if (in_progress.is_zcmsg ()) {
allocator.advance_content();
allocator.release();
}
......
......@@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
// if the message will be a large message, pass a valid refcnt memory location as well
rc = in_progress.init ((unsigned char *) read_pos, static_cast <size_t> (msg_size),
shared_message_memory_allocator::call_dec_ref, buffer(),
provide_refcnt ());
provide_content ());
// For small messages, data has been copied and refcount does not have to be increased
if (in_progress.is_zcmsg())
{
advance_refcnt();
advance_content();
inc_ref();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment