Commit e70b5efa authored by Jens Auer's avatar Jens Auer

Reuse buffer when no messages depend on it.

parent 51cb57e2
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_): zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
buf(NULL), buf(NULL),
bufsize( 0 ), bufsize( 0 ),
maxsize( bufsize_ ), max_size( bufsize_ ),
msg_refcnt( NULL ) msg_refcnt( NULL )
{ {
...@@ -53,7 +53,9 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t buf ...@@ -53,7 +53,9 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t buf
zmq::shared_message_memory_allocator::~shared_message_memory_allocator() zmq::shared_message_memory_allocator::~shared_message_memory_allocator()
{ {
deallocate(); if (buf) {
deallocate();
}
} }
unsigned char* zmq::shared_message_memory_allocator::allocate() unsigned char* zmq::shared_message_memory_allocator::allocate()
...@@ -61,25 +63,38 @@ unsigned char* zmq::shared_message_memory_allocator::allocate() ...@@ -61,25 +63,38 @@ unsigned char* zmq::shared_message_memory_allocator::allocate()
if (buf) if (buf)
{ {
// release reference count to couple lifetime to messages // release reference count to couple lifetime to messages
call_dec_ref(NULL, buf); zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
// release pointer because we are going to create a new buffer
release(); // if refcnt drops to 0, there are no message using the buffer
// because either all messages have been closed or only vsm-messages
// were created
if (c->sub(1)) {
// buffer is still in use as message data. "Release" it and create a new one
// release pointer because we are going to create a new buffer
release();
}
} }
// @todo aligmnet padding may be needed // if buf != NULL it is not used by any message so we can re-use it for the next run
if (!buf) if (!buf) {
{
// allocate memory for reference counters together with reception buffer // allocate memory for reference counters together with reception buffer
size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size); size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size);
size_t const allocationsize = maxsize + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t); size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t);
buf = (unsigned char *) malloc(allocationsize); buf = (unsigned char *) malloc(allocationsize);
alloc_assert (buf); alloc_assert (buf);
new(buf) atomic_counter_t(1); new(buf) atomic_counter_t(1);
msg_refcnt = reinterpret_cast<zmq::atomic_counter_t*>( buf + sizeof(atomic_counter_t) + maxsize ); }
else
{
// release reference count to couple lifetime to messages
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
c->set(1);
} }
bufsize = maxsize; bufsize = max_size;
msg_refcnt = reinterpret_cast<zmq::atomic_counter_t*>( buf + sizeof(atomic_counter_t) + max_size );
return buf + sizeof( zmq::atomic_counter_t); return buf + sizeof( zmq::atomic_counter_t);
} }
...@@ -108,31 +123,24 @@ void zmq::shared_message_memory_allocator::inc_ref() ...@@ -108,31 +123,24 @@ void zmq::shared_message_memory_allocator::inc_ref()
void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) { void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) {
zmq_assert( hint ); zmq_assert( hint );
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(hint); unsigned char* buf = static_cast<unsigned char*>(hint);
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
if (!c->sub(1)) { if (!c->sub(1)) {
c->~atomic_counter_t(); c->~atomic_counter_t();
free(hint); free(buf);
buf = NULL;
} }
} }
size_t zmq::shared_message_memory_allocator::size() const size_t zmq::shared_message_memory_allocator::size() const
{ {
if (buf) return bufsize;
{
return bufsize;
}
else
{
return 0;
}
} }
unsigned char* zmq::shared_message_memory_allocator::data() unsigned char* zmq::shared_message_memory_allocator::data()
{ {
zmq_assert(buf);
return buf + sizeof(zmq::atomic_counter_t); return buf + sizeof(zmq::atomic_counter_t);
} }
......
...@@ -92,7 +92,7 @@ namespace zmq ...@@ -92,7 +92,7 @@ namespace zmq
private: private:
unsigned char* buf; unsigned char* buf;
size_t bufsize; size_t bufsize;
size_t maxsize; size_t max_size;
zmq::atomic_counter_t* msg_refcnt; zmq::atomic_counter_t* msg_refcnt;
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment