Commit 611e96c7 authored by Jens Auer's avatar Jens Auer

Allocation-free msg::init_data

With a msg_t size of 64 bytes, it becomes possible to embedd the content_t's members
struct for large messages directly in the msg_t. This saves the dynamic allocation
of content_t obejcts when using msg_t::init_data.

content_t contains a zmq::atomic_counter_t object which is not a POD in C++98
and thus it cannot be used as a member of the union u. To bypass this, C++11
is used which has relaxed rules for POD and atomic_counter is a C++11-POD. An
alternative would have been to make atomic_counter a classical POD by removing
constructors and all private member functions, i.e. have a struct and free functions
to manipulate it.

A new msg_t::init function is added which decides to either to copy the data for size<64 bytes
or use msg_t::init_data to do zero-copy initialization.
parent 03d6a734
...@@ -45,11 +45,30 @@ ...@@ -45,11 +45,30 @@
typedef char zmq_msg_size_check typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
// check whether the size of atomic_counter_t matches the size of the wrapped integer
// to ensure that the lsmg union is correctly aligned
typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::atomic_counter_t) == sizeof (zmq::atomic_counter_t::integer_t)) != 0) - 1];
bool zmq::msg_t::check () bool zmq::msg_t::check ()
{ {
return u.base.type >= type_min && u.base.type <= type_max; return u.base.type >= type_min && u.base.type <= type_max;
} }
int zmq::msg_t::init (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_)
{
if (size_ <= max_vsm_size)
{
int rc = init_size(size_);
memcpy(data(), data_, size_);
return rc;
}
else
{
return init_data(data_, size_, ffn_, hint_);
}
}
int zmq::msg_t::init () int zmq::msg_t::init ()
{ {
u.vsm.metadata = NULL; u.vsm.metadata = NULL;
...@@ -76,18 +95,16 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -76,18 +95,16 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.content = u.lmsg.data = malloc(size_);
(content_t*) malloc (sizeof (content_t) + size_); if (unlikely (!u.lmsg.data)) {
if (unlikely (!u.lmsg.content)) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
u.lmsg.content->data = u.lmsg.content + 1; u.lmsg.size = size_;
u.lmsg.content->size = size_; u.lmsg.ffn = NULL;
u.lmsg.content->ffn = NULL; u.lmsg.hint = NULL;
u.lmsg.content->hint = NULL; new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t ();
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
} }
...@@ -115,17 +132,12 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, ...@@ -115,17 +132,12 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
u.lmsg.content->data = data_; u.lmsg.data = data_;
u.lmsg.content->size = size_; u.lmsg.size = size_;
u.lmsg.content->ffn = ffn_; u.lmsg.ffn = ffn_;
u.lmsg.content->hint = hint_; u.lmsg.hint = hint_;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -140,6 +152,13 @@ int zmq::msg_t::init_delimiter () ...@@ -140,6 +152,13 @@ int zmq::msg_t::init_delimiter ()
return 0; return 0;
} }
zmq::atomic_counter_t& zmq::msg_t::msg_counter()
{
zmq_assert( is_lmsg() );
void* ptr = static_cast<void*>( &u.lmsg.refcnt.counter );
return *static_cast<atomic_counter_t*>( ptr );
}
int zmq::msg_t::close () int zmq::msg_t::close ()
{ {
// Check the validity of the message. // Check the validity of the message.
...@@ -153,16 +172,14 @@ int zmq::msg_t::close () ...@@ -153,16 +172,14 @@ int zmq::msg_t::close ()
// If the content is not shared, or if it is shared and the reference // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it. // count has dropped to zero, deallocate it.
if (!(u.lmsg.flags & msg_t::shared) || if (!(u.lmsg.flags & msg_t::shared) ||
!u.lmsg.content->refcnt.sub (1)) { !msg_counter().sub (1)) {
// We used "placement new" operator to initialize the reference if (u.lmsg.ffn) {
// counter so we call the destructor explicitly now. u.lmsg.ffn(u.lmsg.data, u.lmsg.hint);
u.lmsg.content->refcnt.~atomic_counter_t (); }
else {
if (u.lmsg.content->ffn) free (u.lmsg.data);
u.lmsg.content->ffn (u.lmsg.content->data, }
u.lmsg.content->hint);
free (u.lmsg.content);
} }
} }
...@@ -214,10 +231,10 @@ int zmq::msg_t::copy (msg_t &src_) ...@@ -214,10 +231,10 @@ int zmq::msg_t::copy (msg_t &src_)
// One reference is added to shared messages. Non-shared messages // One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2. // are turned into shared messages and reference count is set to 2.
if (src_.u.lmsg.flags & msg_t::shared) if (src_.u.lmsg.flags & msg_t::shared)
src_.u.lmsg.content->refcnt.add (1); src_.msg_counter().add (1);
else { else {
src_.u.lmsg.flags |= msg_t::shared; src_.u.lmsg.flags |= msg_t::shared;
src_.u.lmsg.content->refcnt.set (2); src_.msg_counter().set (2);
} }
} }
...@@ -239,7 +256,7 @@ void *zmq::msg_t::data () ...@@ -239,7 +256,7 @@ void *zmq::msg_t::data ()
case type_vsm: case type_vsm:
return u.vsm.data; return u.vsm.data;
case type_lmsg: case type_lmsg:
return u.lmsg.content->data; return u.lmsg.data;
case type_cmsg: case type_cmsg:
return u.cmsg.data; return u.cmsg.data;
default: default:
...@@ -257,7 +274,7 @@ size_t zmq::msg_t::size () ...@@ -257,7 +274,7 @@ size_t zmq::msg_t::size ()
case type_vsm: case type_vsm:
return u.vsm.size; return u.vsm.size;
case type_lmsg: case type_lmsg:
return u.lmsg.content->size; return u.lmsg.size;
case type_cmsg: case type_cmsg:
return u.cmsg.size; return u.cmsg.size;
default: default:
...@@ -333,6 +350,11 @@ bool zmq::msg_t::is_vsm () ...@@ -333,6 +350,11 @@ bool zmq::msg_t::is_vsm ()
return u.base.type == type_vsm; return u.base.type == type_vsm;
} }
bool zmq::msg_t::is_lmsg () const
{
return u.base.type == type_lmsg;
}
bool zmq::msg_t::is_cmsg () bool zmq::msg_t::is_cmsg ()
{ {
return u.base.type == type_cmsg; return u.base.type == type_cmsg;
...@@ -353,9 +375,9 @@ void zmq::msg_t::add_refs (int refs_) ...@@ -353,9 +375,9 @@ void zmq::msg_t::add_refs (int refs_)
// message type that needs special care are long messages. // message type that needs special care are long messages.
if (u.base.type == type_lmsg) { if (u.base.type == type_lmsg) {
if (u.lmsg.flags & msg_t::shared) if (u.lmsg.flags & msg_t::shared)
u.lmsg.content->refcnt.add (refs_); msg_counter().add (refs_);
else { else {
u.lmsg.content->refcnt.set (refs_ + 1); msg_counter().set (refs_ + 1);
u.lmsg.flags |= msg_t::shared; u.lmsg.flags |= msg_t::shared;
} }
} }
...@@ -379,14 +401,14 @@ bool zmq::msg_t::rm_refs (int refs_) ...@@ -379,14 +401,14 @@ bool zmq::msg_t::rm_refs (int refs_)
} }
// The only message type that needs special care are long messages. // The only message type that needs special care are long messages.
if (!u.lmsg.content->refcnt.sub (refs_)) { if (!msg_counter().sub (refs_)) {
// We used "placement new" operator to initialize the reference // We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now. // counter so we call the destructor explicitly now.
u.lmsg.content->refcnt.~atomic_counter_t (); msg_counter().~atomic_counter_t ();
if (u.lmsg.content->ffn) if (u.lmsg.ffn)
u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint); u.lmsg.ffn (u.lmsg.data, u.lmsg.hint);
free (u.lmsg.content); free (u.lmsg.data);
return false; return false;
} }
...@@ -404,3 +426,4 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_) ...@@ -404,3 +426,4 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_)
u.base.routing_id = routing_id_; u.base.routing_id = routing_id_;
return 0; return 0;
} }
...@@ -54,7 +54,6 @@ namespace zmq ...@@ -54,7 +54,6 @@ namespace zmq
class msg_t class msg_t
{ {
public: public:
// Message flags. // Message flags.
enum enum
{ {
...@@ -66,6 +65,8 @@ namespace zmq ...@@ -66,6 +65,8 @@ namespace zmq
}; };
bool check (); bool check ();
int init (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_);
int init (); int init ();
int init_size (size_t size_); int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_, int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
...@@ -88,6 +89,7 @@ namespace zmq ...@@ -88,6 +89,7 @@ namespace zmq
bool is_credential () const; bool is_credential () const;
bool is_delimiter () const; bool is_delimiter () const;
bool is_vsm (); bool is_vsm ();
bool is_lmsg () const;
bool is_cmsg (); bool is_cmsg ();
uint32_t get_routing_id(); uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_); int set_routing_id(uint32_t routing_id_);
...@@ -107,22 +109,6 @@ namespace zmq ...@@ -107,22 +109,6 @@ namespace zmq
enum { msg_t_size = 64 }; enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Different message types. // Different message types.
enum type_t enum type_t
{ {
...@@ -138,6 +124,8 @@ namespace zmq ...@@ -138,6 +124,8 @@ namespace zmq
type_max = 104 type_max = 104
}; };
atomic_counter_t& msg_counter();
// the file descriptor where this message originated, needs to be 64bit due to alignment // the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc; int64_t file_desc;
...@@ -161,10 +149,32 @@ namespace zmq ...@@ -161,10 +149,32 @@ namespace zmq
unsigned char flags; unsigned char flags;
uint32_t routing_id; uint32_t routing_id;
} vsm; } vsm;
struct { struct lmsg_t {
metadata_t *metadata; metadata_t *metadata;
content_t *content; // Shared message buffer. Message data are either allocated in one
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2 + sizeof(uint32_t))]; // continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
// create an aligned block for an atomic_counter_t object
union aligned_atomic_counter_storage {
zmq::atomic_counter_t::integer_t maxAlign;
unsigned char counter[ sizeof(zmq::atomic_counter_t) ];
} refcnt;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *)
+ sizeof(void*)
+ sizeof(size_t)
+ sizeof(msg_free_fn*)
+ sizeof(void*)
+ sizeof(aligned_atomic_counter_storage)
+ 2
+ sizeof(uint32_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
uint32_t routing_id; uint32_t routing_id;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment