Commit 391bc12d authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1539 from djelenc/issue1116

Addresses zeromq/libzmq#1116.
parents 3f326b10 b9634e11
...@@ -92,6 +92,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -92,6 +92,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{ {
last_pipe = pipe_; last_pipe = pipe_;
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata());
pending_flags.push_back(0); pending_flags.push_back(0);
} }
else else
...@@ -108,6 +109,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -108,6 +109,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
(*data == 0 && verbose_unsubs && verbose_subs))) { (*data == 0 && verbose_unsubs && verbose_subs))) {
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata());
pending_flags.push_back(0); pending_flags.push_back(0);
} }
} }
...@@ -115,6 +117,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -115,6 +117,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
else { else {
// Process user message coming upstream from xsub socket // Process user message coming upstream from xsub socket
pending_data.push_back (blob_t (data, size)); pending_data.push_back (blob_t (data, size));
pending_metadata.push_back (sub.metadata ());
pending_flags.push_back (sub.flags ()); pending_flags.push_back (sub.flags ());
} }
sub.close (); sub.close ();
...@@ -241,8 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_) ...@@ -241,8 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
memcpy (msg_->data (), memcpy (msg_->data (),
pending_data.front ().data (), pending_data.front ().data (),
pending_data.front ().size ()); pending_data.front ().size ());
msg_->set_metadata (pending_metadata.front ());
msg_->set_flags (pending_flags.front ()); msg_->set_flags (pending_flags.front ());
pending_data.pop_front (); pending_data.pop_front ();
pending_metadata.pop_front ();
pending_flags.pop_front (); pending_flags.pop_front ();
return 0; return 0;
} }
...@@ -265,6 +270,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -265,6 +270,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
if (size_ > 0) if (size_ > 0)
memcpy (&unsub [1], data_, size_); memcpy (&unsub [1], data_, size_);
self->pending_data.push_back (unsub); self->pending_data.push_back (unsub);
self->pending_metadata.push_back (NULL);
self->pending_flags.push_back (0); self->pending_flags.push_back (0);
} }
} }
...@@ -109,6 +109,7 @@ namespace zmq ...@@ -109,6 +109,7 @@ namespace zmq
// applied to the trie, but not yet received by the user. // applied to the trie, but not yet received by the user.
typedef std::basic_string <unsigned char> blob_t; typedef std::basic_string <unsigned char> blob_t;
std::deque <blob_t> pending_data; std::deque <blob_t> pending_data;
std::deque <metadata_t*> pending_metadata;
std::deque <unsigned char> pending_flags; std::deque <unsigned char> pending_flags;
xpub_t (const xpub_t&); xpub_t (const xpub_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment