test_xpub_manual.cpp 15.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32

33
SETUP_TEARDOWN_TESTCONTEXT
34

35 36
void test_basic ()
{
37
    //  Create a publisher
38
    void *pub = test_context_socket (ZMQ_XPUB);
39
    int manual = 1;
40 41 42
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
43 44

    //  Create a subscriber
45 46
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
47

48
    //  Subscribe for A
49 50
    const char subscription[] = {1, 'A', 0};
    send_string_expect_success (sub, subscription, 0);
51 52

    // Receive subscriptions from subscriber
53
    recv_string_expect_success (pub, subscription, 0);
54 55

    // Subscribe socket for B instead
56
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
57 58

    // Sending A message and B Message
59 60
    send_string_expect_success (pub, "A", 0);
    send_string_expect_success (pub, "B", 0);
61

62
    recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
63 64

    //  Clean up.
65 66
    test_context_socket_close (pub);
    test_context_socket_close (sub);
67
}
68

69
void test_unsubscribe_manual ()
70 71
{
    //  Create a publisher
72 73
    void *pub = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
74 75 76

    //  set pub socket options
    int manual = 1;
77 78
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, sizeof (manual)));
79 80

    //  Create a subscriber
81 82
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
83 84

    //  Subscribe for A
85 86
    const uint8_t subscription1[] = {1, 'A'};
    send_array_expect_success (sub, subscription1, 0);
87 88

    //  Subscribe for B
89 90
    const uint8_t subscription2[] = {1, 'B'};
    send_array_expect_success (sub, subscription2, 0);
91 92 93 94

    char buffer[3];

    // Receive subscription "A" from subscriber
95
    recv_array_expect_success (pub, subscription1, 0);
96 97

    // Subscribe socket for XA instead
98
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
99 100

    // Receive subscription "B" from subscriber
101
    recv_array_expect_success (pub, subscription2, 0);
102 103

    // Subscribe socket for XB instead
104
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
105 106

    //  Unsubscribe from A
107 108
    const uint8_t unsubscription1[2] = {0, 'A'};
    send_array_expect_success (sub, unsubscription1, 0);
109 110

    // Receive unsubscription "A" from subscriber
111
    recv_array_expect_success (pub, unsubscription1, 0);
112 113

    // Unsubscribe socket from XA instead
114
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
115 116

    // Sending messages XA, XB
117 118
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
119 120

    // Subscriber should receive XB only
121
    recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
122 123

    // Close subscriber
124
    test_context_socket_close (sub);
125 126

    // Receive unsubscription "B"
127 128 129 130 131 132
    const char unsubscription2[2] = {0, 'B'};
    TEST_ASSERT_EQUAL_INT (
      sizeof unsubscription2,
      TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
    TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
                                  sizeof unsubscription2);
133 134

    // Unsubscribe socket from XB instead
135
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
136 137

    //  Clean up.
138
    test_context_socket_close (pub);
139 140
}

141
void test_xpub_proxy_unsubscribe_on_disconnect ()
142
{
143 144
    const uint8_t topic_buff[] = {"1"};
    const uint8_t payload_buff[] = {"X"};
145

146 147 148
    char my_endpoint_backend[MAX_SOCKET_STRING];
    char my_endpoint_frontend[MAX_SOCKET_STRING];

149 150 151
    int manual = 1;

    // proxy frontend
152 153 154
    void *xsub_proxy = test_context_socket (ZMQ_XSUB);
    bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
                        sizeof my_endpoint_frontend);
155 156

    // proxy backend
157 158 159 160 161
    void *xpub_proxy = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
                        sizeof my_endpoint_backend);
162 163

    // publisher
164 165
    void *pub = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
166 167

    // first subscriber subscribes
168 169 170 171
    void *sub1 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
172 173

    // wait
174
    msleep (SETTLE_TIME);
175 176

    // proxy reroutes and confirms subscriptions
177 178 179 180 181
    const uint8_t subscription[2] = {1, *topic_buff};
    recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, subscription, 0);
182 183

    // second subscriber subscribes
184 185 186 187
    void *sub2 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
188 189

    // wait
190
    msleep (SETTLE_TIME);
191 192

    // proxy reroutes
193 194 195 196
    recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, subscription, 0);
197 198

    // wait
199
    msleep (SETTLE_TIME);
200 201

    // let publisher send a msg
202 203
    send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (pub, payload_buff, 0);
204 205

    // wait
206
    msleep (SETTLE_TIME);
207 208

    // proxy reroutes data messages to subscribers
209 210 211 212
    recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
    send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (xpub_proxy, payload_buff, 0);
213 214

    // wait
215
    msleep (SETTLE_TIME);
216 217

    // each subscriber should now get a message
218 219
    recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
220

221 222
    recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
    recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
223 224

    //  Disconnect both subscribers
225 226
    test_context_socket_close (sub1);
    test_context_socket_close (sub2);
227 228

    // wait
229
    msleep (SETTLE_TIME);
230 231

    // unsubscribe messages are passed from proxy to publisher
232 233 234 235 236
    const uint8_t unsubscription[] = {0, *topic_buff};
    recv_array_expect_success (xpub_proxy, unsubscription, 0);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, unsubscription, 0);
237 238

    // should receive another unsubscribe msg
239 240 241 242
    recv_array_expect_success (xpub_proxy, unsubscription, 0);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
    send_array_expect_success (xsub_proxy, unsubscription, 0);
243 244

    // wait
245
    msleep (SETTLE_TIME);
246 247

    // let publisher send a msg
248 249
    send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
    send_array_expect_success (pub, payload_buff, 0);
250

251
    // wait
252
    msleep (SETTLE_TIME);
253 254

    // nothing should come to the proxy
255 256 257
    char buffer[1];
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
258

259 260 261
    test_context_socket_close (pub);
    test_context_socket_close (xpub_proxy);
    test_context_socket_close (xsub_proxy);
262 263
}

264
void test_missing_subscriptions ()
265
{
266 267 268
    const char *topic1 = "1";
    const char *topic2 = "2";
    const char *payload = "X";
269

270 271 272
    char my_endpoint_backend[MAX_SOCKET_STRING];
    char my_endpoint_frontend[MAX_SOCKET_STRING];

273 274 275
    int manual = 1;

    // proxy frontend
276 277 278
    void *xsub_proxy = test_context_socket (ZMQ_XSUB);
    bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
                        sizeof my_endpoint_frontend);
279 280

    // proxy backend
281 282 283 284 285
    void *xpub_proxy = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
                        sizeof my_endpoint_backend);
286 287

    // publisher
288 289
    void *pub = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
290 291 292 293 294 295

    // Here's the problem: because subscribers subscribe in quick succession,
    // the proxy is unable to confirm the first subscription before receiving
    // the second. This causes the first subscription to get lost.

    // first subscriber
296 297 298
    void *sub1 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
299 300

    // second subscriber
301 302 303
    void *sub2 = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
304 305

    // wait
306
    msleep (SETTLE_TIME);
307 308

    // proxy now reroutes and confirms subscriptions
309 310 311 312 313 314 315 316 317 318 319
    const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
    recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
    send_array_expect_success (xsub_proxy, subscription1, 0);

    const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
    recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
    send_array_expect_success (xsub_proxy, subscription2, 0);
320 321

    // wait
322
    msleep (SETTLE_TIME);
323

324 325 326 327 328
    // let publisher send 2 msgs, each with its own topic_buff
    send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
    send_string_expect_success (pub, payload, 0);
    send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
    send_string_expect_success (pub, payload, 0);
329 330

    // wait
331
    msleep (SETTLE_TIME);
332 333

    // proxy reroutes data messages to subscribers
334 335 336 337 338 339 340 341 342
    recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
    recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
    send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
    send_string_expect_success (xpub_proxy, payload, 0);

    recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
    recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
    send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
    send_string_expect_success (xpub_proxy, payload, 0);
343 344

    // wait
345
    msleep (SETTLE_TIME);
346 347

    // each subscriber should now get a message
348 349
    recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
    recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
350

351 352
    recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
    recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
353 354

    //  Clean up
355 356 357 358 359
    test_context_socket_close (sub1);
    test_context_socket_close (sub2);
    test_context_socket_close (pub);
    test_context_socket_close (xpub_proxy);
    test_context_socket_close (xsub_proxy);
360 361
}

362
void test_unsubscribe_cleanup ()
363 364 365 366
{
    char my_endpoint[MAX_SOCKET_STRING];

    //  Create a publisher
367
    void *pub = test_context_socket (ZMQ_XPUB);
368
    int manual = 1;
369 370 371
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
    bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
372 373

    //  Create a subscriber
374 375
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
376 377

    //  Subscribe for A
378 379
    const uint8_t subscription1[2] = {1, 'A'};
    send_array_expect_success (sub, subscription1, 0);
380 381 382


    // Receive subscriptions from subscriber
383 384
    recv_array_expect_success (pub, subscription1, 0);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
385 386

    // send 2 messages
387 388
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
389 390

    // receive the single message
391
    recv_string_expect_success (sub, "XA", 0);
392 393

    // should be nothing left in the queue
394 395 396
    char buffer[2];
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
397 398

    // close the socket
399
    test_context_socket_close (sub);
400 401

    // closing the socket will result in an unsubscribe event
402 403
    const uint8_t unsubscription[2] = {0, 'A'};
    recv_array_expect_success (pub, unsubscription, 0);
404 405 406

    // this doesn't really do anything
    // there is no last_pipe set it will just fail silently
407
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
408 409

    // reconnect
410 411
    sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
412 413

    // send a subscription for B
414 415
    const uint8_t subscription2[2] = {1, 'B'};
    send_array_expect_success (sub, subscription2, 0);
416 417

    // receive the subscription, overwrite it to XB
418 419
    recv_array_expect_success (pub, subscription2, 0);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
420 421

    // send 2 messages
422 423
    send_string_expect_success (pub, "XA", 0);
    send_string_expect_success (pub, "XB", 0);
424 425

    // receive the single message
426
    recv_string_expect_success (sub, "XB", 0);
427 428

    // should be nothing left in the queue
429 430
    TEST_ASSERT_FAILURE_ERRNO (
      EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
431 432

    //  Clean up.
433 434
    test_context_socket_close (pub);
    test_context_socket_close (sub);
435 436
}

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
void test_user_message ()
{
    //  Create a publisher
    void *pub = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));

    //  Create a subscriber
    void *sub = test_context_socket (ZMQ_XSUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));

    //  Send some data that is neither sub nor unsub
    const char subscription[] = {2, 'A', 0};
    send_string_expect_success (sub, subscription, 0);

    // Receive subscriptions from subscriber
    recv_string_expect_success (pub, subscription, 0);

    //  Clean up.
    test_context_socket_close (pub);
    test_context_socket_close (sub);
}

459
int main ()
460 461 462
{
    setup_test_environment ();

463 464 465 466 467 468
    UNITY_BEGIN ();
    RUN_TEST (test_basic);
    RUN_TEST (test_unsubscribe_manual);
    RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
    RUN_TEST (test_missing_subscriptions);
    RUN_TEST (test_unsubscribe_cleanup);
469
    RUN_TEST (test_user_message);
470 471

    return UNITY_END ();
472
}