Commit d33fb6a2 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1441 from jens-auer/rebase3

parents b3f2acf7 e70b5efa
......@@ -643,7 +643,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases
TESTS = $(test_apps)
XFAIL_TESTS = tests/test_msg_ffn
XFAIL_TESTS += tests/test_abstract_ipc
......@@ -73,6 +73,10 @@ namespace zmq
return bufsize;
void resize(size_t new_size)
bufsize = new_size;
size_t bufsize;
unsigned char* buf;
......@@ -190,6 +194,11 @@ namespace zmq
return 0;
virtual void resize_buffer(size_t new_size)
// Prototype of state machine action. Action should return false if
......@@ -46,6 +46,7 @@ namespace zmq
virtual void get_buffer (unsigned char **data_, size_t *size_) = 0;
virtual void resize_buffer(size_t) = 0;
// Decodes data pointed to by data_.
// When a message is decoded, 1 is returned.
// When the decoder needs more data, 0 is returnd.
......@@ -54,6 +55,8 @@ namespace zmq
size_t &processed) = 0;
virtual msg_t *msg () = 0;
This diff is collapsed.
......@@ -54,6 +54,7 @@ namespace zmq
class msg_t
// Message flags.
......@@ -65,12 +66,17 @@ namespace zmq
bool check ();
int init (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_);
int init ();
int init();
int init (void* data, size_t size_,
msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_ = NULL);
int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_);
int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr,
msg_free_fn *ffn_, void *hint_);
int init_delimiter ();
int close ();
int move (msg_t &src_);
......@@ -88,9 +94,9 @@ namespace zmq
bool is_identity () const;
bool is_credential () const;
bool is_delimiter () const;
bool is_vsm ();
bool is_lmsg () const;
bool is_cmsg ();
bool is_vsm () const;
bool is_cmsg () const;
bool is_zcmsg() const;
uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_);
......@@ -102,13 +108,30 @@ namespace zmq
// references drops to 0, the message is closed and false is returned.
bool rm_refs (int refs_);
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };
zmq::atomic_counter_t* refcnt();
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
// Different message types.
enum type_t
......@@ -121,10 +144,12 @@ namespace zmq
type_delimiter = 103,
// CMSG messages point to constant data
type_cmsg = 104,
type_max = 104
atomic_counter_t& msg_counter();
// zero-copy LMSG message for v2_decoder
type_zclmsg = 105,
type_max = 105
// the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc;
......@@ -149,36 +174,36 @@ namespace zmq
unsigned char flags;
uint32_t routing_id;
} vsm;
struct lmsg_t {
struct {
metadata_t *metadata;
content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *)
+ sizeof (content_t*)
+ 2
+ sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} lmsg;
struct {
metadata_t *metadata;
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
// create an aligned block for an atomic_counter_t object
union aligned_atomic_counter_storage {
zmq::atomic_counter_t::integer_t maxAlign;
unsigned char counter[ sizeof(zmq::atomic_counter_t) ];
} refcnt;
zmq::atomic_counter_t* refcnt;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *)
+ sizeof(void*)
+ sizeof(size_t)
+ sizeof(msg_free_fn*)
+ sizeof(void*)
+ sizeof(aligned_atomic_counter_storage)
+ sizeof (void*)
+ sizeof (size_t)
+ sizeof (msg_free_fn*)
+ sizeof (void*)
+ sizeof (zmq::atomic_counter_t*)
+ 2
+ sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} lmsg;
} zclmsg;
struct {
metadata_t *metadata;
void* data;
......@@ -56,7 +56,7 @@ namespace zmq
virtual msg_t *msg () { return &in_progress; }
virtual void resize_buffer(size_t) {}
......@@ -295,6 +295,7 @@ void zmq::stream_engine_t::in_event ()
decoder->get_buffer (&inpos, &bufsize);
const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error (connection_error);
......@@ -307,6 +308,8 @@ void zmq::stream_engine_t::in_event ()
// Adjust input size
insize = static_cast <size_t> (rc);
// Adjust buffer size to received bytes
int rc = 0;
......@@ -29,6 +29,7 @@
#include <stdlib.h>
#include <string.h>
#include <cmath>
#include "platform.hpp"
......@@ -43,14 +44,18 @@
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
bufsize( bufsize_ )
bufsize( 0 ),
max_size( bufsize_ ),
msg_refcnt( NULL )
if (buf) {
unsigned char* zmq::shared_message_memory_allocator::allocate()
......@@ -58,19 +63,38 @@ unsigned char* zmq::shared_message_memory_allocator::allocate()
if (buf)
// release reference count to couple lifetime to messages
call_dec_ref(NULL, buf);
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
// if refcnt drops to 0, there are no message using the buffer
// because either all messages have been closed or only vsm-messages
// were created
if (c->sub(1)) {
// buffer is still in use as message data. "Release" it and create a new one
// release pointer because we are going to create a new buffer
// @todo aligmnet padding may be needed
if (!buf)
buf = (unsigned char *) malloc(bufsize + sizeof(zmq::atomic_counter_t));
// if buf != NULL it is not used by any message so we can re-use it for the next run
if (!buf) {
// allocate memory for reference counters together with reception buffer
size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size);
size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t);
buf = (unsigned char *) malloc(allocationsize);
alloc_assert (buf);
new(buf) atomic_counter_t(1);
// release reference count to couple lifetime to messages
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
bufsize = max_size;
msg_refcnt = reinterpret_cast<zmq::atomic_counter_t*>( buf + sizeof(atomic_counter_t) + max_size );
return buf + sizeof( zmq::atomic_counter_t);
......@@ -78,19 +102,18 @@ void zmq::shared_message_memory_allocator::deallocate()
buf = NULL;
bufsize = 0;
msg_refcnt = NULL;
unsigned char* zmq::shared_message_memory_allocator::release()
unsigned char* b = buf;
buf = NULL;
return b;
bufsize = 0;
msg_refcnt = NULL;
void zmq::shared_message_memory_allocator::reset(unsigned char* b)
buf = b;
return b;
void zmq::shared_message_memory_allocator::inc_ref()
......@@ -100,31 +123,24 @@ void zmq::shared_message_memory_allocator::inc_ref()
void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) {
zmq_assert( hint );
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(hint);
unsigned char* buf = static_cast<unsigned char*>(hint);
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
if (!c->sub(1)) {
buf = NULL;
size_t zmq::shared_message_memory_allocator::size() const
if (buf)
return bufsize;
return 0;
unsigned char* zmq::shared_message_memory_allocator::data()
return buf + sizeof(zmq::atomic_counter_t);
......@@ -199,6 +215,7 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
// the current message can exceed the current buffer. We have to copy the buffer
// data into a new message and complete it in the next receive.
if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
// a new message has started, but the size would exceed the pre-allocated arena
......@@ -209,12 +226,13 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
// construct message using n bytes from the buffer as storage
// increase buffer ref count
rc = in_progress.init( (unsigned char*)read_pos,
msg_size, shared_message_memory_allocator::call_dec_ref,
buffer() );
// if the message will be a large message, pass a valid refcnt memory location as well
rc = in_progress.init( (unsigned char*)read_pos, msg_size,
shared_message_memory_allocator::call_dec_ref, buffer(),
create_refcnt() );
// For small messages, data has been copied and refcount does not have to be increased
if (in_progress.is_lmsg())
if (in_progress.is_zcmsg())
......@@ -63,8 +63,6 @@ namespace zmq
// the messages constructed on top of it.
unsigned char* release();
void reset(unsigned char* b);
void inc_ref();
static void call_dec_ref(void*, void* buffer);
......@@ -80,9 +78,22 @@ namespace zmq
return buf;
void resize(size_t new_size)
bufsize = new_size;
zmq::atomic_counter_t* create_refcnt()
return msg_refcnt++;
unsigned char* buf;
size_t bufsize;
size_t max_size;
zmq::atomic_counter_t* msg_refcnt;
// Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment