Commit 8d85638f authored by Martin Sustrik's avatar Martin Sustrik

memory leak in message encoder fixed

parent 92aa9e94
...@@ -9,6 +9,7 @@ Dirk O. Kaar ...@@ -9,6 +9,7 @@ Dirk O. Kaar
Erich Heine Erich Heine
Frank Denis Frank Denis
George Neill George Neill
Jon Dyte
Martin Hurton Martin Hurton
Martin Lucina Martin Lucina
Martin Sustrik Martin Sustrik
......
...@@ -79,6 +79,9 @@ int main (int argc, char *argv []) ...@@ -79,6 +79,9 @@ int main (int argc, char *argv [])
if (elapsed == 0) if (elapsed == 0)
elapsed = 1; elapsed = 1;
rc = zmq_msg_close (&msg);
assert (rc == 0);
throughput = (unsigned long) throughput = (unsigned long)
((double) message_count / (double) elapsed * 1000000); ((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000; megabits = (double) (throughput * message_size * 8) / 1000000;
......
...@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
{ {
// The communication is unidirectional.
// We don't expect any message to arrive.
zmq_assert (out_pipe);
if (out_pipe->write (msg_)) { if (out_pipe->write (msg_)) {
zmq_msg_init (msg_); zmq_msg_init (msg_);
return true; return true;
......
...@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () ...@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else { else {
// TODO: Handle over-sized message decently. // TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf); int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf); size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently. // TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size); int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
bool zmq::zmq_decoder_t::message_ready () bool zmq::zmq_decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress)) if (!destination || !destination->write (&in_progress))
return false; return false;
......
...@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready () ...@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready () bool zmq::zmq_encoder_t::message_ready ()
{ {
// Destroy content of the old message.
zmq_msg_close(&in_progress);
// Read new message from the dispatcher. If there is none, return false. // Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way // Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine // unsuccessful write will cause retry on the next state machine
// invocation. // invocation.
if (!source || !source->read (&in_progress)) if (!source || !source->read (&in_progress)) {
zmq_msg_init (&in_progress);
return false; return false;
}
size_t size = zmq_msg_size (&in_progress); size_t size = zmq_msg_size (&in_progress);
......
...@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) ...@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
has_peer_identity = true; has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_), peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_)); zmq_msg_size (msg_));
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment