Commit 8fd71d63 authored by David Jelenc's avatar David Jelenc

Fixed missing subscriptions on XPUB with manual subscriptions

The patch fixes the issue #1568.
parent 47e1216e
...@@ -41,6 +41,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -41,6 +41,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
more (false), more (false),
lossy (true), lossy (true),
manual(false), manual(false),
pending_pipes (),
welcome_msg () welcome_msg ()
{ {
last_pipe = NULL; last_pipe = NULL;
...@@ -90,7 +91,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -90,7 +91,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (size > 0 && (*data == 0 || *data == 1)) { if (size > 0 && (*data == 0 || *data == 1)) {
if (manual) if (manual)
{ {
last_pipe = pipe_; pending_pipes.push_back(pipe_);
pending_data.push_back(blob_t(data, size)); pending_data.push_back(blob_t(data, size));
pending_metadata.push_back(sub.metadata()); pending_metadata.push_back(sub.metadata());
pending_flags.push_back(0); pending_flags.push_back(0);
...@@ -243,6 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_) ...@@ -243,6 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
return -1; return -1;
} }
// User is reading a message, set last_pipe and remove it from the deque
last_pipe = pending_pipes.front ();
pending_pipes.pop_front ();
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init_size (pending_data.front ().size ()); rc = msg_->init_size (pending_data.front ().size ());
...@@ -281,6 +286,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -281,6 +286,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
if (size_ > 0) if (size_ > 0)
memcpy (&unsub [1], data_, size_); memcpy (&unsub [1], data_, size_);
self->last_pipe = NULL; self->last_pipe = NULL;
self->pending_pipes.push_back (NULL);
self->pending_data.push_back (unsub); self->pending_data.push_back (unsub);
self->pending_metadata.push_back (NULL); self->pending_metadata.push_back (NULL);
self->pending_flags.push_back (0); self->pending_flags.push_back (0);
......
...@@ -99,9 +99,12 @@ namespace zmq ...@@ -99,9 +99,12 @@ namespace zmq
// Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE
bool manual; bool manual;
// Last pipe send subscription message, only used if xpub is on manual // Last pipe that sent subscription message, only used if xpub is on manual
pipe_t *last_pipe; pipe_t *last_pipe;
// Pipes that sent subscriptions messages that have not yet been processed, only used if xpub is on manual
std::deque <pipe_t*> pending_pipes;
// Welcome message to send to pipe when attached // Welcome message to send to pipe when attached
msg_t welcome_msg; msg_t welcome_msg;
......
...@@ -228,11 +228,128 @@ int test_xpub_proxy_unsubscribe_on_disconnect() ...@@ -228,11 +228,128 @@ int test_xpub_proxy_unsubscribe_on_disconnect()
return 0; return 0;
} }
int test_missing_subscriptions()
{
const char* frontend = "ipc://frontend";
const char* backend = "ipc://backend";
const char* topic1 = "1";
const char* topic2 = "2";
const char* payload = "X";
int manual = 1;
void *ctx = zmq_ctx_new ();
assert (ctx);
// proxy frontend
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub_proxy);
assert (zmq_bind (xsub_proxy, frontend) == 0);
// proxy backend
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub_proxy);
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
assert (zmq_bind (xpub_proxy, backend) == 0);
// publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (zmq_connect (pub, frontend) == 0);
// Here's the problem: because subscribers subscribe in quick succession,
// the proxy is unable to confirm the first subscription before receiving
// the second. This causes the first subscription to get lost.
// first subscriber
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
assert (sub1);
assert (zmq_connect (sub1, backend) == 0);
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0);
// second subscriber
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
assert (sub2);
assert (zmq_connect (sub2, backend) == 0);
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
// wait
assert (zmq_poll (0, 0, 100) == 0);
// proxy now reroutes and confirms subscriptions
char buffer[2];
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
assert (buffer [0] == 1);
assert (buffer [1] == *topic1);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1) == 0);
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2);
assert (buffer [0] == 1);
assert (buffer [1] == *topic2);
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1) == 0);
assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2);
// wait
assert (zmq_poll (0, 0, 100) == 0);
// let publisher send 2 msgs, each with its own topic
assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
assert (zmq_send (pub, topic2, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (pub, payload, 1, 0) == 1);
// wait
assert (zmq_poll (0, 0, 100) == 0);
// proxy reroutes data messages to subscribers
char topic_buff [1];
char data_buff [1];
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff [0] == *topic1);
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff [0] == *payload);
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff [0] == *topic2);
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff [0] == *payload);
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
// wait
assert (zmq_poll (0, 0, 100) == 0);
// each subscriber should now get a message
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff [0] == *topic2);
assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff [0] == *payload);
assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
assert (topic_buff [0] == *topic1);
assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
assert (data_buff [0] == *payload);
// Clean up
assert (zmq_close (sub1) == 0);
assert (zmq_close (sub2) == 0);
assert (zmq_close (pub) == 0);
assert (zmq_close (xpub_proxy) == 0);
assert (zmq_close (xsub_proxy) == 0);
assert (zmq_ctx_term (ctx) == 0);
return 0;
}
int main(void) int main(void)
{ {
setup_test_environment (); setup_test_environment ();
test_basic (); test_basic ();
test_xpub_proxy_unsubscribe_on_disconnect (); test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions ();
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment