test_xpub_manual.cpp 15.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32

33
void setUp ()
34
{
35 36 37 38 39 40 41
    setup_test_context ();
}

void tearDown ()
{
    teardown_test_context ();
}
42

43 44
void test_basic ()
{
45
    //  Create a publisher
46
    void *pub = test_context_socket (ZMQ_XPUB);
47
    int manual = 1;
48 49 50
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
51 52

    //  Create a subscriber
53 54
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
55

56
    //  Subscribe for A
57 58
    const char subscription[] = {1, 'A', 0};
    send_string_expect_success (sub, subscription, 0);
59 60

    // Receive subscriptions from subscriber
61
    recv_string_expect_success (pub, subscription, 0);
62 63

    // Subscribe socket for B instead
64
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
65 66

    // Sending A message and B Message
67 68
    send_string_expect_success (pub, "A", 0);
    send_string_expect_success (pub, "B", 0);
69

70
    recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
71 72

    //  Clean up.
73 74
    test_context_socket_close (pub);
    test_context_socket_close (sub);
75
}
76

77
void test_unsubscribe_manual ()
78 79
{
    //  Create a publisher
80 81
    void *pub = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
82 83 84

    //  set pub socket options
    int manual = 1;
85 86
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, sizeof (manual)));
87 88

    //  Create a subscriber
89 90
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
91 92

    //  Subscribe for A
93 94
    const uint8_t subscription1[] = {1, 'A'};
    send_array_expect_success (sub, subscription1, 0);
95 96

    //  Subscribe for B
97 98
    const uint8_t subscription2[] = {1, 'B'};
    send_array_expect_success (sub, subscription2, 0);
99 100 101 102

    char buffer[3];

    // Receive subscription "A" from subscriber
103
    recv_array_expect_success (pub, subscription1, 0);
104 105

    // Subscribe socket for XA instead
106
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
107 108

    // Receive subscription "B" from subscriber
109
    recv_array_expect_success (pub, subscription2, 0);
110 111

    // Subscribe socket for XB instead
112
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
113 114

    //  Unsubscribe from A
115 116
    const uint8_t unsubscription1[2] = {0, 'A'};
    send_array_expect_success (sub, unsubscription1, 0);
117 118

    // Receive unsubscription "A" from subscriber
119
    recv_array_expect_success (pub, unsubscription1, 0);
120 121

    // Unsubscribe socket from XA instead
122
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
123 124

    // Sending messages XA, XB
125 126
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
127 128

    // Subscriber should receive XB only
129
    recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
130 131

    // Close subscriber
132
    test_context_socket_close (sub);
133 134

    // Receive unsubscription "B"
135 136 137 138 139 140
    const char unsubscription2[2] = {0, 'B'};
    TEST_ASSERT_EQUAL_INT (
      sizeof unsubscription2,
      TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
    TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
                                  sizeof unsubscription2);
141 142

    // Unsubscribe socket from XB instead
143
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
144 145

    //  Clean up.
146
    test_context_socket_close (pub);
147 148
}

149
void test_xpub_proxy_unsubscribe_on_disconnect ()
150
{
151 152
    const uint8_t topic_buff[] = {"1"};
    const uint8_t payload_buff[] = {"X"};
153

154 155 156
    char my_endpoint_backend[MAX_SOCKET_STRING];
    char my_endpoint_frontend[MAX_SOCKET_STRING];

157 158 159
    int manual = 1;

    // proxy frontend
160 161 162
    void *xsub_proxy = test_context_socket (ZMQ_XSUB);
    bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
                        sizeof my_endpoint_frontend);
163 164

    // proxy backend
165 166 167 168 169
    void *xpub_proxy = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
                        sizeof my_endpoint_backend);
170 171

    // publisher
172 173
    void *pub = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
174 175

    // first subscriber subscribes
176 177 178 179
    void *sub1 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
180 181

    // wait
182
    msleep (SETTLE_TIME);
183 184

    // proxy reroutes and confirms subscriptions
185 186 187 188 189
    const uint8_t subscription[2] = {1, *topic_buff};
    recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, subscription, 0);
190 191

    // second subscriber subscribes
192 193 194 195
    void *sub2 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
196 197

    // wait
198
    msleep (SETTLE_TIME);
199 200

    // proxy reroutes
201 202 203 204
    recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, subscription, 0);
205 206

    // wait
207
    msleep (SETTLE_TIME);
208 209

    // let publisher send a msg
210 211
    send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (pub, payload_buff, 0);
212 213

    // wait
214
    msleep (SETTLE_TIME);
215 216

    // proxy reroutes data messages to subscribers
217 218 219 220
    recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
    send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (xpub_proxy, payload_buff, 0);
221 222

    // wait
223
    msleep (SETTLE_TIME);
224 225

    // each subscriber should now get a message
226 227
    recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
228

229 230
    recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
231 232

    //  Disconnect both subscribers
233 234
    test_context_socket_close (sub1);
    test_context_socket_close (sub2);
235 236

    // wait
237
    msleep (SETTLE_TIME);
238 239

    // unsubscribe messages are passed from proxy to publisher
240 241 242 243 244
    const uint8_t unsubscription[] = {0, *topic_buff};
    recv_array_expect_success (xpub_proxy, unsubscription, 0);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, unsubscription, 0);
245 246

    // should receive another unsubscribe msg
247 248 249 250
    recv_array_expect_success (xpub_proxy, unsubscription, 0);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, unsubscription, 0);
251 252

    // wait
253
    msleep (SETTLE_TIME);
254 255

    // let publisher send a msg
256 257
    send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (pub, payload_buff, 0);
258

259
    // wait
260
    msleep (SETTLE_TIME);
261 262

    // nothing should come to the proxy
263 264 265
    char buffer[1];
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
266

267 268 269
    test_context_socket_close (pub);
    test_context_socket_close (xpub_proxy);
    test_context_socket_close (xsub_proxy);
270 271
}

272
void test_missing_subscriptions ()
273
{
274 275 276
    const char *topic1 = "1";
    const char *topic2 = "2";
    const char *payload = "X";
277

278 279 280
    char my_endpoint_backend[MAX_SOCKET_STRING];
    char my_endpoint_frontend[MAX_SOCKET_STRING];

281 282 283
    int manual = 1;

    // proxy frontend
284 285 286
    void *xsub_proxy = test_context_socket (ZMQ_XSUB);
    bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
                        sizeof my_endpoint_frontend);
287 288

    // proxy backend
289 290 291 292 293
    void *xpub_proxy = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
                        sizeof my_endpoint_backend);
294 295

    // publisher
296 297
    void *pub = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
298 299 300 301 302 303

    // Here's the problem: because subscribers subscribe in quick succession,
    // the proxy is unable to confirm the first subscription before receiving
    // the second. This causes the first subscription to get lost.

    // first subscriber
304 305 306
    void *sub1 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
307 308

    // second subscriber
309 310 311
    void *sub2 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
312 313

    // wait
314
    msleep (SETTLE_TIME);
315 316

    // proxy now reroutes and confirms subscriptions
317 318 319 320 321 322 323 324 325 326 327
    const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
    recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
    send_array_expect_success (xsub_proxy, subscription1, 0);

    const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
    recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
    send_array_expect_success (xsub_proxy, subscription2, 0);
328 329

    // wait
330
    msleep (SETTLE_TIME);
331

332 333 334 335 336
    // let publisher send 2 msgs, each with its own topic_buff
    send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
    send_string_expect_success (pub, payload, 0);
    send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
    send_string_expect_success (pub, payload, 0);
337 338

    // wait
339
    msleep (SETTLE_TIME);
340 341

    // proxy reroutes data messages to subscribers
342 343 344 345 346 347 348 349 350
    recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
    recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
    send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
    send_string_expect_success (xpub_proxy, payload, 0);

    recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
    recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
    send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
    send_string_expect_success (xpub_proxy, payload, 0);
351 352

    // wait
353
    msleep (SETTLE_TIME);
354 355

    // each subscriber should now get a message
356 357
    recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
    recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
358

359 360
    recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
    recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
361 362

    //  Clean up
363 364 365 366 367
    test_context_socket_close (sub1);
    test_context_socket_close (sub2);
    test_context_socket_close (pub);
    test_context_socket_close (xpub_proxy);
    test_context_socket_close (xsub_proxy);
368 369
}

370
void test_unsubscribe_cleanup ()
371 372 373 374
{
    char my_endpoint[MAX_SOCKET_STRING];

    //  Create a publisher
375
    void *pub = test_context_socket (ZMQ_XPUB);
376
    int manual = 1;
377 378 379
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
380 381

    //  Create a subscriber
382 383
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
384 385

    //  Subscribe for A
386 387
    const uint8_t subscription1[2] = {1, 'A'};
    send_array_expect_success (sub, subscription1, 0);
388 389 390


    // Receive subscriptions from subscriber
391 392
    recv_array_expect_success (pub, subscription1, 0);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
393 394

    // send 2 messages
395 396
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
397 398

    // receive the single message
399
    recv_string_expect_success (sub, "XA", 0);
400 401

    // should be nothing left in the queue
402 403 404
    char buffer[2];
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
405 406

    // close the socket
407
    test_context_socket_close (sub);
408 409

    // closing the socket will result in an unsubscribe event
410 411
    const uint8_t unsubscription[2] = {0, 'A'};
    recv_array_expect_success (pub, unsubscription, 0);
412 413 414

    // this doesn't really do anything
    // there is no last_pipe set it will just fail silently
415
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
416 417

    // reconnect
418 419
    sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
420 421

    // send a subscription for B
422 423
    const uint8_t subscription2[2] = {1, 'B'};
    send_array_expect_success (sub, subscription2, 0);
424 425

    // receive the subscription, overwrite it to XB
426 427
    recv_array_expect_success (pub, subscription2, 0);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
428 429

    // send 2 messages
430 431
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
432 433

    // receive the single message
434
    recv_string_expect_success (sub, "XB", 0);
435 436

    // should be nothing left in the queue
437 438
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
439 440

    //  Clean up.
441 442
    test_context_socket_close (pub);
    test_context_socket_close (sub);
443 444
}

445
int main ()
446 447 448
{
    setup_test_environment ();

449 450 451 452 453 454 455 456
    UNITY_BEGIN ();
    RUN_TEST (test_basic);
    RUN_TEST (test_unsubscribe_manual);
    RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
    RUN_TEST (test_missing_subscriptions);
    RUN_TEST (test_unsubscribe_cleanup);

    return UNITY_END ();
457
}