Commit 813c7381 authored by Fedor Sheremetyev's avatar Fedor Sheremetyev

Add test for consistent unsubscription in XPUB manual mode.

Expect custom messages on both explicit unsubscription and pipe termination.
parent a343059a
......@@ -90,6 +90,112 @@ int test_basic()
return 0 ;
}
int test_unsubscribe_manual()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_XPUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);
// set pub socket options
int manual = 1;
rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4);
assert (rc == 0);
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_XSUB);
assert (sub);
rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0);
// Subscribe for A
char subscription1[2] = { 1, 'A'};
rc = zmq_send_const(sub, subscription1, 2, 0);
assert (rc == 2);
// Subscribe for B
char subscription2[2] = { 1, 'B'};
rc = zmq_send_const(sub, subscription2, 2, 0);
assert (rc == 2);
char buffer[3];
// Receive subscription "A" from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 2);
assert(buffer[0] == 1);
assert(buffer[1] == 'A');
// Subscribe socket for XA instead
rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XA", 2);
assert(rc == 0);
// Receive subscription "B" from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 2);
assert(buffer[0] == 1);
assert(buffer[1] == 'B');
// Subscribe socket for XB instead
rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XB", 2);
assert(rc == 0);
// Unsubscribe from A
char unsubscription1[2] = { 0, 'A'};
rc = zmq_send_const(sub, unsubscription1, 2, 0);
assert (rc == 2);
// Receive unsubscription "A" from subscriber
rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 2);
assert(buffer[0] == 0);
assert(buffer[1] == 'A');
// Unsubscribe socket from XA instead
rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XA", 2);
assert(rc == 0);
// Sending messages XA, XB
rc = zmq_send_const(pub, "XA", 2, 0);
assert(rc == 2);
rc = zmq_send_const(pub, "XB", 2, 0);
assert(rc == 2);
// Subscriber should receive XB only
rc = zmq_recv(sub, buffer, 2, ZMQ_DONTWAIT);
assert(rc == 2);
assert(buffer[0] == 'X');
assert(buffer[1] == 'B');
// Close subscriber
rc = zmq_close (sub);
assert (rc == 0);
// Receive unsubscription "B"
rc = zmq_recv(pub, buffer, 2, 0);
assert(rc == 2);
assert(buffer[0] == 0);
assert(buffer[1] == 'B');
// Unsubscribe socket from XB instead
rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XB", 2);
assert(rc == 0);
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
const char *backend)
{
......@@ -349,6 +455,7 @@ int main(void)
{
setup_test_environment ();
test_basic ();
test_unsubscribe_manual ();
const char *frontend;
const char *backend;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment