Commit a4bedc52 authored by Pieter Hintjens's avatar Pieter Hintjens

Whitespace and comment fixes

parent da0efaa8
...@@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
} }
else /*process message unrelated to sub/unsub*/ { else
// Process user message coming upstream from xsub socket
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
}
sub.close (); sub.close ();
} }
...@@ -177,7 +177,6 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -177,7 +177,6 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
xpub_t *self = (xpub_t*) arg_; xpub_t *self = (xpub_t*) arg_;
if (self->options.type != ZMQ_PUB) { if (self->options.type != ZMQ_PUB) {
// Place the unsubscription to the queue of pending (un)sunscriptions // Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on. // to be retrived by the user later on.
blob_t unsub (size_ + 1, 0); blob_t unsub (size_ + 1, 0);
......
...@@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t () ...@@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t ()
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{ {
// icanhasall_ is unused // icanhasall_ is unused
(void)icanhasall_; (void) icanhasall_;
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
...@@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data (); unsigned char *data = (unsigned char*) msg_->data ();
// Process the subscription.
if (*data == 1) { if (*data == 1) {
// this used to filter out duplicate subscriptions, // Process subscribe message
// This used to filter out duplicate subscriptions,
// however this is alread done on the XPUB side and // however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE // doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved // when there are forwarding devices involved.
subscriptions.add (data + 1, size - 1); subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else if (*data == 0) { else
if (*data == 0) {
// Process unsubscribe message
if (subscriptions.rm (data + 1, size - 1)) if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else /*upstream message unrelated to sub/unsub*/ { else
// User message sent upstream to XPUB socket
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment