Commit b7f2c7e7 authored by Dave Olszewski's avatar Dave Olszewski

Increment metadata refcount while it's in pending_metadata

parent 16f58474
...@@ -89,6 +89,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -89,6 +89,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// Apply the subscription to the trie // Apply the subscription to the trie
unsigned char *const data = (unsigned char *) sub.data (); unsigned char *const data = (unsigned char *) sub.data ();
const size_t size = sub.size (); const size_t size = sub.size ();
metadata_t* metadata = sub.metadata();
if (size > 0 && (*data == 0 || *data == 1)) { if (size > 0 && (*data == 0 || *data == 1)) {
if (manual) if (manual)
{ {
...@@ -100,7 +101,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -100,7 +101,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
pending_pipes.push_back(pipe_); pending_pipes.push_back(pipe_);
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); if (metadata)
metadata->add_ref();
pending_metadata.push_back(metadata);
pending_flags.push_back(0); pending_flags.push_back(0);
} }
else else
...@@ -117,7 +120,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -117,7 +120,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
(*data == 0 && verbose_unsubs && verbose_subs))) { (*data == 0 && verbose_unsubs && verbose_subs))) {
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); if (metadata)
metadata->add_ref();
pending_metadata.push_back(metadata);
pending_flags.push_back(0); pending_flags.push_back(0);
} }
} }
...@@ -125,7 +130,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -125,7 +130,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
else { else {
// Process user message coming upstream from xsub socket // Process user message coming upstream from xsub socket
pending_data.push_back (blob_t (data, size)); pending_data.push_back (blob_t (data, size));
pending_metadata.push_back (sub.metadata ()); if (metadata)
metadata->add_ref();
pending_metadata.push_back (metadata);
pending_flags.push_back (sub.flags ()); pending_flags.push_back (sub.flags ());
} }
sub.close (); sub.close ();
...@@ -280,6 +287,8 @@ int zmq::xpub_t::xrecv (msg_t *msg_) ...@@ -280,6 +287,8 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
// set metadata only if there is some // set metadata only if there is some
if (metadata_t* metadata = pending_metadata.front ()) { if (metadata_t* metadata = pending_metadata.front ()) {
msg_->set_metadata (metadata); msg_->set_metadata (metadata);
// Remove ref corresponding to vector placement
metadata->drop_ref();
} }
msg_->set_flags (pending_flags.front ()); msg_->set_flags (pending_flags.front ());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment