Commit 5a854780 authored by Andrij Abyzov's avatar Andrij Abyzov

Problem: cannot send arbitrary data from XSUB to XPUB.

Solution: now if the first frame in a multipart message is not subscribe/unsubscribe,
the rest of the parts are also considered to be not subscribe/unsubscribe.
parent 85df7558
......@@ -1154,6 +1154,22 @@ Default value:: NULL
Applicable socket types:: ZMQ_XPUB
ZMQ_ONLY_FIRST_SUBSCRIBE: Process only fist subscribe/unsubscribe in a multipart message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set, only the first part of the multipart message is processed as
a subscribe/unsubscribe message. The rest are forwarded as user data
regardless of message contents.
It not set (default), subscribe/unsubscribe messages in a multipart message
are processed as such regardless of their number and order.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: ZMQ_XSUB, ZMQ_XPUB
ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the domain for ZAP (ZMQ RFC 27) authentication. A ZAP domain must be
......
......@@ -675,6 +675,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_WSS_TRUST_PEM 105
#define ZMQ_WSS_HOSTNAME 106
#define ZMQ_WSS_TRUST_SYSTEM 107
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
/* DRAFT Context options */
......
......@@ -43,6 +43,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
_verbose_unsubs (false),
_more_send (false),
_more_recv (false),
_process_subscribe (false),
_only_first_subscribe (false),
_lossy (true),
_manual (false),
_send_last_pipe (false),
......@@ -101,7 +103,10 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
bool subscribe = false;
bool is_subscribe_or_cancel = false;
if (!_more_recv) {
bool first_part = !_more_recv;
_more_recv = (msg.flags () & msg_t::more) != 0;
if (first_part || _process_subscribe) {
// Apply the subscription to the trie
if (msg.is_subscribe () || msg.is_cancel ()) {
data = static_cast<unsigned char *> (msg.command_body ());
......@@ -116,7 +121,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
}
}
_more_recv = (msg.flags () & msg_t::more) != 0;
if (first_part)
_process_subscribe =
!_only_first_subscribe || is_subscribe_or_cancel;
if (!is_subscribe_or_cancel) {
// Process user message coming upstream from xsub socket
......@@ -199,7 +206,7 @@ int zmq::xpub_t::xsetsockopt (int option_,
{
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
|| option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
|| option_ == ZMQ_XPUB_MANUAL) {
|| option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
if (optvallen_ != sizeof (int)
|| *static_cast<const int *> (optval_) < 0) {
errno = EINVAL;
......@@ -218,6 +225,8 @@ int zmq::xpub_t::xsetsockopt (int option_,
_lossy = (*static_cast<const int *> (optval_) == 0);
else if (option_ == ZMQ_XPUB_MANUAL)
_manual = (*static_cast<const int *> (optval_) != 0);
else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE)
_only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
} else if (option_ == ZMQ_SUBSCRIBE && _manual) {
if (_last_pipe != NULL)
_subscriptions.add ((unsigned char *) optval_, optvallen_,
......
......@@ -96,6 +96,15 @@ class xpub_t : public socket_base_t
// True if we are in the middle of receiving a multi-part message.
bool _more_recv;
// If true, subscribe and cancel messages are processed for the rest
// of multipart message.
bool _process_subscribe;
// This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
// If true, messages following subscribe/unsubscribe in a multipart
// message are treated as user data regardless of the first byte.
bool _only_first_subscribe;
// Drop messages if HWM reached, otherwise return with EAGAIN
bool _lossy;
......
......@@ -38,7 +38,9 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
_has_message (false),
_more_send (false),
_more_recv (false)
_more_recv (false),
_process_subscribe (false),
_only_first_subscribe (false)
{
options.type = ZMQ_XSUB;
......@@ -95,17 +97,38 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
pipe_->flush ();
}
int zmq::xsub_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
if (optvallen_ != sizeof (int)
|| *static_cast<const int *> (optval_) < 0) {
errno = EINVAL;
return -1;
}
_only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
return 0;
} else {
errno = EINVAL;
return -1;
}
}
int zmq::xsub_t::xsend (msg_t *msg_)
{
size_t size = msg_->size ();
unsigned char *data = static_cast<unsigned char *> (msg_->data ());
bool send_more = _more_send;
bool first_part = !_more_send;
_more_send = (msg_->flags () & msg_t::more) != 0;
if (send_more)
if (first_part) {
_process_subscribe = !_only_first_subscribe;
} else if (!_process_subscribe) {
// User message sent upstream to XPUB socket
return _dist.send_to_all (msg_);
}
if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
// Process subscribe message
......@@ -121,6 +144,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size = size - 1;
}
_subscriptions.add (data, size);
_process_subscribe = true;
return _dist.send_to_all (msg_);
}
if (msg_->is_cancel () || (size > 0 && *data == 0)) {
......@@ -132,6 +156,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
data = data + 1;
size = size - 1;
}
_process_subscribe = true;
if (_subscriptions.rm (data, size))
return _dist.send_to_all (msg_);
} else
......
......@@ -57,6 +57,7 @@ class xsub_t : public socket_base_t
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
......@@ -101,6 +102,15 @@ class xsub_t : public socket_base_t
// there are following parts still waiting.
bool _more_recv;
// If true, subscribe and cancel messages are processed for the rest
// of multipart message.
bool _process_subscribe;
// This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
// If true, messages following subscribe/unsubscribe in a multipart
// message are treated as user data regardless of the first byte.
bool _only_first_subscribe;
xsub_t (const xsub_t &);
const xsub_t &operator= (const xsub_t &);
};
......
......@@ -57,6 +57,8 @@
#define ZMQ_SOCKS_PASSWORD 100
#define ZMQ_IN_BATCH_SIZE 101
#define ZMQ_OUT_BATCH_SIZE 102
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
......
......@@ -456,57 +456,66 @@ void test_user_message ()
test_context_socket_close (sub);
}
#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
void test_user_message_multi ()
{
const int only_first_subscribe = 1;
// Create a publisher
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_ONLY_FIRST_SUBSCRIBE,
&only_first_subscribe,
sizeof (only_first_subscribe)));
// Create a subscriber
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_ONLY_FIRST_SUBSCRIBE,
&only_first_subscribe,
sizeof (only_first_subscribe)));
// Send some data that is neither sub nor unsub
const uint8_t msg_common[] = {'A', 'B', 'C'};
// Message starts with 0 but should still treated as user
const uint8_t msg_0[] = {0, 'B', 'C'};
const uint8_t msg_0a[] = {0, 'B', 'C'};
const uint8_t msg_0b[] = {0, 'C', 'D'};
// Message starts with 1 but should still treated as user
const uint8_t msg_1[] = {1, 'B', 'C'};
const uint8_t msg_1a[] = {1, 'B', 'C'};
const uint8_t msg_1b[] = {1, 'C', 'D'};
// Test second message starting with 0
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_0, 0);
send_array_expect_success (sub, msg_0a, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_0, 0);
recv_array_expect_success (pub, msg_0a, 0);
// Test second message starting with 1
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_1, 0);
send_array_expect_success (sub, msg_1a, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_1, 0);
char buffer[255];
// Test first message starting with 0
send_array_expect_success (sub, msg_0, 0);
// wait
msleep (SETTLE_TIME);
int rc = zmq_recv (pub, buffer, sizeof (buffer), ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
recv_array_expect_success (pub, msg_1a, 0);
// Test first message starting with 1
send_array_expect_success (sub, msg_1, 0);
recv_array_expect_success (pub, msg_1, 0);
send_array_expect_success (sub, msg_1a, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_1b, 0);
recv_array_expect_success (pub, msg_1a, 0);
recv_array_expect_success (pub, msg_1b, 0);
send_array_expect_success (sub, msg_0a, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_0b, 0);
recv_array_expect_success (pub, msg_0a, 0);
recv_array_expect_success (pub, msg_0b, 0);
// Clean up.
test_context_socket_close (pub);
test_context_socket_close (sub);
}
#endif
int main ()
{
......@@ -519,7 +528,9 @@ int main ()
RUN_TEST (test_missing_subscriptions);
RUN_TEST (test_unsubscribe_cleanup);
RUN_TEST (test_user_message);
#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
RUN_TEST (test_user_message_multi);
#endif
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment