Commit 0880d5b8 authored by Martin Hurton's avatar Martin Hurton

Merge pull request #547 from hintjens/master

Fixed issue LIBZMQ-525
parents d0c58d24 f0cf4095
...@@ -56,9 +56,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -56,9 +56,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// There are some subscriptions waiting. Let's process them. // There are some subscriptions waiting. Let's process them.
msg_t sub; msg_t sub;
while (pipe_->read (&sub)) { while (pipe_->read (&sub)) {
// Apply the subscription to the trie
// Apply the subscription to the trie. unsigned char *const data = (unsigned char *) sub.data ();
unsigned char *const data = (unsigned char*) sub.data ();
const size_t size = sub.size (); const size_t size = sub.size ();
if (size > 0 && (*data == 0 || *data == 1)) { if (size > 0 && (*data == 0 || *data == 1)) {
bool unique; bool unique;
...@@ -69,13 +68,16 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -69,13 +68,16 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// If the subscription is not a duplicate store it so that it can be // If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.) // passed to used on next recv call. (Unsubscribe is not verbose.)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
pending.push_back (blob_t (data, size)); pending_data.push_back (blob_t (data, size));
pending_flags.push_back (0);
}
} }
else else {
// Process user message coming upstream from xsub socket // Process user message coming upstream from xsub socket
pending.push_back (blob_t (data, size)); pending_data.push_back (blob_t (data, size));
pending_flags.push_back (sub.flags ());
}
sub.close (); sub.close ();
} }
} }
...@@ -149,24 +151,27 @@ bool zmq::xpub_t::xhas_out () ...@@ -149,24 +151,27 @@ bool zmq::xpub_t::xhas_out ()
int zmq::xpub_t::xrecv (msg_t *msg_) int zmq::xpub_t::xrecv (msg_t *msg_)
{ {
// If there is at least one // If there is at least one
if (pending.empty ()) { if (pending_data.empty ()) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init_size (pending.front ().size ()); rc = msg_->init_size (pending_data.front ().size ());
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (msg_->data (), pending.front ().data (), memcpy (msg_->data (),
pending.front ().size ()); pending_data.front ().data (),
pending.pop_front (); pending_data.front ().size ());
msg_->set_flags (pending_flags.front ());
pending_data.pop_front ();
pending_flags.pop_front ();
return 0; return 0;
} }
bool zmq::xpub_t::xhas_in () bool zmq::xpub_t::xhas_in ()
{ {
return !pending.empty (); return !pending_data.empty ();
} }
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
...@@ -180,7 +185,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -180,7 +185,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
blob_t unsub (size_ + 1, 0); blob_t unsub (size_ + 1, 0);
unsub [0] = 0; unsub [0] = 0;
memcpy (&unsub [1], data_, size_); memcpy (&unsub [1], data_, size_);
self->pending.push_back (unsub); self->pending_data.push_back (unsub);
self->pending_flags.push_back (0);
} }
} }
......
...@@ -82,8 +82,8 @@ namespace zmq ...@@ -82,8 +82,8 @@ namespace zmq
// List of pending (un)subscriptions, ie. those that were already // List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user. // applied to the trie, but not yet received by the user.
typedef std::basic_string <unsigned char> blob_t; typedef std::basic_string <unsigned char> blob_t;
typedef std::deque <blob_t> pending_t; std::deque <blob_t> pending_data;
pending_t pending; std::deque <unsigned char> pending_flags;
xpub_t (const xpub_t&); xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&); const xpub_t &operator = (const xpub_t&);
......
...@@ -83,7 +83,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_) ...@@ -83,7 +83,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
int zmq::xsub_t::xsend (msg_t *msg_) int zmq::xsub_t::xsend (msg_t *msg_)
{ {
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data (); unsigned char *data = (unsigned char *) msg_->data ();
if (size > 0 && *data == 1) { if (size > 0 && *data == 1) {
// Process subscribe message // Process subscribe message
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment