Commit 3536c4b9 authored by Luca Boccassi's avatar Luca Boccassi

Problem: XPUB_MANUAL subscriptions not removed on peer term

Solution: remove the pipe from the real trie when a peer disconnects.
Also add a unit test that exercises the behaviour by reconnecting
a different socket and sending a message that matches.
Fixes #2601 and introduced by #2042
parent 8e202798
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "macros.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
...@@ -203,6 +204,13 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, ...@@ -203,6 +204,13 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
static void stub (unsigned char *data_, size_t size_, void *arg_)
{
LIBZMQ_UNUSED(data_);
LIBZMQ_UNUSED(size_);
LIBZMQ_UNUSED(arg_);
}
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
{ {
if (manual) if (manual)
...@@ -210,6 +218,10 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) ...@@ -210,6 +218,10 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
// Remove the pipe from the trie and send corresponding manual // Remove the pipe from the trie and send corresponding manual
// unsubscriptions upstream. // unsubscriptions upstream.
manual_subscriptions.rm (pipe_, send_unsubscription, this, false); manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
// Remove pipe without actually sending the message as it was taken
// care of by the manual call above. subscriptions is the real mtrie,
// so the pipe must be removed from there or it will be left over.
subscriptions.rm (pipe_, stub, NULL, false);
} }
else else
{ {
......
...@@ -466,6 +466,124 @@ int test_missing_subscriptions(void) ...@@ -466,6 +466,124 @@ int test_missing_subscriptions(void)
} }
int test_unsubscribe_cleanup (void)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int manual = 1;
int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
rc = zmq_bind (pub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);
// Subscribe for A
char subscription[2] = { 1, 'A'};
rc = zmq_send_const (sub, subscription, 2, 0);
assert (rc == 2);
char buffer[2];
// Receive subscriptions from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert (buffer[1] == 'A');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2);
assert (rc == 0);
// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);
// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'A');
// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);
// close the socket
rc = zmq_close (sub);
assert (rc == 0);
// closing the socket will result in an unsubscribe event
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 0);
assert (buffer[1] == 'A');
// this doesn't really do anything
// there is no last_pipe set it will just fail silently
rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert (rc == 0);
// reconnect
sub = zmq_socket (ctx, ZMQ_XSUB);
rc = zmq_connect (sub, my_endpoint);
assert (rc == 0);
// send a subscription for B
subscription[0] = 1;
subscription[1] = 'B';
rc = zmq_send (sub, subscription, 2, 0);
assert (rc == 2);
// receive the subscription, overwrite it to XB
rc = zmq_recv (pub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 1);
assert(buffer[1] == 'B');
rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2);
assert (rc == 0);
// send 2 messages
rc = zmq_send_const (pub, "XA", 2, 0);
assert (rc == 2);
rc = zmq_send_const (pub, "XB", 2, 0);
assert (rc == 2);
// receive the single message
rc = zmq_recv (sub, buffer, 2, 0);
assert (rc == 2);
assert (buffer[0] == 'X');
assert (buffer[1] == 'B'); // this assertion will fail
// should be nothing left in the queue
rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT);
assert (rc == -1);
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
int main(void) int main(void)
{ {
setup_test_environment (); setup_test_environment ();
...@@ -473,6 +591,7 @@ int main(void) ...@@ -473,6 +591,7 @@ int main(void)
test_unsubscribe_manual (); test_unsubscribe_manual ();
test_xpub_proxy_unsubscribe_on_disconnect (); test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions (); test_missing_subscriptions ();
test_unsubscribe_cleanup ();
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment