Commit c1f76e43 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #494 from jgm-radez/master

allow XSUB/XPUB to send/recv messages unrelated to sub/unsub
parents 98a91e85 d32e3922
...@@ -224,7 +224,8 @@ ZMQ_XPUB ...@@ -224,7 +224,8 @@ ZMQ_XPUB
Same as ZMQ_PUB except that you can receive subscriptions from the peers Same as ZMQ_PUB except that you can receive subscriptions from the peers
in form of incoming messages. Subscription message is a byte 1 (for in form of incoming messages. Subscription message is a byte 1 (for
subscriptions) or byte 0 (for unsubscriptions) followed by the subscription subscriptions) or byte 0 (for unsubscriptions) followed by the subscription
body. body. Messages without a sub/unsub prefix are also received, but have no
effect on subscription status.
[horizontal] [horizontal]
.Summary of ZMQ_XPUB characteristics .Summary of ZMQ_XPUB characteristics
...@@ -240,7 +241,8 @@ ZMQ_XSUB ...@@ -240,7 +241,8 @@ ZMQ_XSUB
^^^^^^^^ ^^^^^^^^
Same as ZMQ_SUB except that you subscribe by sending subscription messages to Same as ZMQ_SUB except that you subscribe by sending subscription messages to
the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 the socket. Subscription message is a byte 1 (for subscriptions) or byte 0
(for unsubscriptions) followed by the subscription body. (for unsubscriptions) followed by the subscription body. Messages without a
sub/unsub prefix may also be sent, but have no effect on subscription status.
[horizontal] [horizontal]
.Summary of ZMQ_XSUB characteristics .Summary of ZMQ_XSUB characteristics
......
...@@ -74,6 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -74,6 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
} }
else /*process message unrelated to sub/unsub*/ {
pending.push_back (blob_t (data, size));
}
sub.close (); sub.close ();
} }
......
...@@ -87,12 +87,6 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -87,12 +87,6 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data (); unsigned char *data = (unsigned char*) msg_->data ();
// Malformed subscriptions.
if (size < 1 || (*data != 0 && *data != 1)) {
errno = EINVAL;
return -1;
}
// Process the subscription. // Process the subscription.
if (*data == 1) { if (*data == 1) {
// this used to filter out duplicate subscriptions, // this used to filter out duplicate subscriptions,
...@@ -102,10 +96,13 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -102,10 +96,13 @@ int zmq::xsub_t::xsend (msg_t *msg_)
subscriptions.add (data + 1, size - 1); subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else { else if (*data == 0) {
if (subscriptions.rm (data + 1, size - 1)) if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else /*upstream message unrelated to sub/unsub*/ {
return dist.send_to_all (msg_);
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment