test_xpub_verbose.cpp 11.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
    Copyright (c) 2018 Contributors as noted in the AUTHORS file

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32 33 34

void setUp ()
{
35
    setup_test_context ();
36
}
37

38 39
void tearDown ()
{
40
    teardown_test_context ();
41 42
}

43 44 45 46 47 48 49 50
const uint8_t unsubscribe_a_msg[] = {0, 'A'};
const uint8_t subscribe_a_msg[] = {1, 'A'};
const uint8_t subscribe_b_msg[] = {1, 'B'};

const char test_endpoint[] = "inproc://soname";
const char topic_a[] = "A";
const char topic_b[] = "B";

51 52
void test_xpub_verbose_one_sub ()
{
53 54
    void *pub = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
55

56 57
    void *sub = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
58 59

    //  Subscribe for A
60
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
61 62

    // Receive subscriptions from subscriber
63
    recv_array_expect_success (pub, subscribe_a_msg, 0);
64 65

    // Subscribe socket for B instead
66
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_b, 1));
67 68

    // Receive subscriptions from subscriber
69
    recv_array_expect_success (pub, subscribe_b_msg, 0);
70 71

    //  Subscribe again for A again
72
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
73 74

    //  This time it is duplicated, so it will be filtered out
75
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
76 77

    int verbose = 1;
78 79
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
80 81

    // Subscribe socket for A again
82
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
83 84

    // This time with VERBOSE the duplicated sub will be received
85
    recv_array_expect_success (pub, subscribe_a_msg, 0);
86 87

    // Sending A message and B Message
88 89
    send_string_expect_success (pub, topic_a, 0);
    send_string_expect_success (pub, topic_b, 0);
90

91 92
    recv_string_expect_success (sub, topic_a, 0);
    recv_string_expect_success (sub, topic_b, 0);
93 94

    //  Clean up.
95 96
    test_context_socket_close (pub);
    test_context_socket_close (sub);
97 98
}

99
void create_xpub_with_2_subs (void **pub_, void **sub0_, void **sub1_)
100
{
101 102 103 104 105 106 107 108
    *pub_ = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (*pub_, test_endpoint));

    *sub0_ = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub0_, test_endpoint));

    *sub1_ = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub1_, test_endpoint));
109
}
110

111 112 113
void create_duplicate_subscription (void *pub_, void *sub0_, void *sub1_)
{
    //  Subscribe for A
114 115
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub0_, ZMQ_SUBSCRIBE, topic_a, 1));
116 117

    // Receive subscriptions from subscriber
118
    recv_array_expect_success (pub_, subscribe_a_msg, 0);
119

120
    //  Subscribe again for A on the other socket
121 122
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1_, ZMQ_SUBSCRIBE, topic_a, 1));
123

124
    //  This time it is duplicated, so it will be filtered out by XPUB
125
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub_, NULL, 0, ZMQ_DONTWAIT));
126 127 128 129 130
}

void test_xpub_verbose_two_subs ()
{
    void *pub, *sub0, *sub1;
131
    create_xpub_with_2_subs (&pub, &sub0, &sub1);
132
    create_duplicate_subscription (pub, sub0, sub1);
133 134

    // Subscribe socket for B instead
135 136
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_b, 1));
137 138

    // Receive subscriptions from subscriber
139
    recv_array_expect_success (pub, subscribe_b_msg, 0);
140 141

    int verbose = 1;
142 143
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
144 145

    // Subscribe socket for A again
146 147
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
148 149

    // This time with VERBOSE the duplicated sub will be received
150
    recv_array_expect_success (pub, subscribe_a_msg, 0);
151 152

    // Sending A message and B Message
153
    send_string_expect_success (pub, topic_a, 0);
154

155
    send_string_expect_success (pub, topic_b, 0);
156

157 158 159
    recv_string_expect_success (sub0, topic_a, 0);
    recv_string_expect_success (sub1, topic_a, 0);
    recv_string_expect_success (sub0, topic_b, 0);
160 161

    //  Clean up.
162 163 164
    test_context_socket_close (pub);
    test_context_socket_close (sub0);
    test_context_socket_close (sub1);
165 166 167 168 169
}

void test_xpub_verboser_one_sub ()
{
    //  Create a publisher
170 171
    void *pub = test_context_socket (ZMQ_XPUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
172 173

    //  Create a subscriber
174 175
    void *sub = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
176 177

    //  Unsubscribe for A, does not exist yet
178 179
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
180 181

    //  Does not exist, so it will be filtered out by XSUB
182
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
183 184

    //  Subscribe for A
185
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
186 187

    // Receive subscriptions from subscriber
188
    recv_array_expect_success (pub, subscribe_a_msg, 0);
189 190

    //  Subscribe again for A again, XSUB will increase refcount
191
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
192 193

    //  This time it is duplicated, so it will be filtered out by XPUB
194
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
195 196

    //  Unsubscribe for A, this time it exists in XPUB
197 198
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
199 200 201

    //  XSUB refcounts and will not actually send unsub to PUB until the number
    //  of unsubs match the earlier subs
202 203
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
204 205

    // Receive unsubscriptions from subscriber
206
    recv_array_expect_success (pub, unsubscribe_a_msg, 0);
207 208

    //  XSUB only sends the last and final unsub, so XPUB will only receive 1
209
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
210 211

    //  Unsubscribe for A, does not exist anymore
212 213
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
214 215

    //  Does not exist, so it will be filtered out by XSUB
216
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
217 218

    int verbose = 1;
219 220
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
221 222

    // Subscribe socket for A again
223
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
224 225

    // Receive subscriptions from subscriber, did not exist anymore
226
    recv_array_expect_success (pub, subscribe_a_msg, 0);
227 228

    // Sending A message to make sure everything still works
229
    send_string_expect_success (pub, topic_a, 0);
230

231
    recv_string_expect_success (sub, topic_a, 0);
232 233

    //  Unsubscribe for A, this time it exists
234 235
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
236 237

    // Receive unsubscriptions from subscriber
238
    recv_array_expect_success (pub, unsubscribe_a_msg, 0);
239 240

    //  Unsubscribe for A again, it does not exist anymore so XSUB will filter
241 242
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
243 244 245

    //  XSUB only sends unsub if it matched it in its trie, IOW: it will only
    //  send it if it existed in the first place even with XPUB_VERBBOSER
246
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
247 248

    //  Clean up.
249 250
    test_context_socket_close (pub);
    test_context_socket_close (sub);
251 252 253 254
}

void test_xpub_verboser_two_subs ()
{
255
    void *pub, *sub0, *sub1;
256
    create_xpub_with_2_subs (&pub, &sub0, &sub1);
257
    create_duplicate_subscription (pub, sub0, sub1);
258 259

    //  Unsubscribe for A, this time it exists in XPUB
260 261
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
262 263

    //  sub1 is still subscribed, so no notification
264
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
265 266

    //  Unsubscribe the second socket to trigger the notification
267 268
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
269 270

    // Receive unsubscriptions since all sockets are gone
271
    recv_array_expect_success (pub, unsubscribe_a_msg, 0);
272 273

    //  Make really sure there is only one notification
274
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
275 276

    int verbose = 1;
277 278
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
279 280

    // Subscribe socket for A again
281 282
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_a, 1));
283 284

    // Subscribe socket for A again
285 286
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
287 288

    // Receive subscriptions from subscriber, did not exist anymore
289
    recv_array_expect_success (pub, subscribe_a_msg, 0);
290 291

    //  VERBOSER is set, so subs from both sockets are received
292
    recv_array_expect_success (pub, subscribe_a_msg, 0);
293 294

    // Sending A message to make sure everything still works
295
    send_string_expect_success (pub, topic_a, 0);
296

297 298
    recv_string_expect_success (sub0, topic_a, 0);
    recv_string_expect_success (sub1, topic_a, 0);
299 300

    //  Unsubscribe for A
301 302
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
303 304

    // Receive unsubscriptions from first subscriber due to VERBOSER
305
    recv_array_expect_success (pub, unsubscribe_a_msg, 0);
306 307

    //  Unsubscribe for A again from the other socket
308 309
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
310 311

    // Receive unsubscriptions from first subscriber due to VERBOSER
312
    recv_array_expect_success (pub, unsubscribe_a_msg, 0);
313 314

    //  Unsubscribe again to make sure it gets filtered now
315 316
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
317 318

    //  Unmatched, so XSUB filters even with VERBOSER
319
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
320 321

    //  Clean up.
322 323 324
    test_context_socket_close (pub);
    test_context_socket_close (sub0);
    test_context_socket_close (sub1);
325 326
}

327
int main ()
328 329 330 331 332 333 334 335 336
{
    setup_test_environment ();

    UNITY_BEGIN ();
    RUN_TEST (test_xpub_verbose_one_sub);
    RUN_TEST (test_xpub_verbose_two_subs);
    RUN_TEST (test_xpub_verboser_one_sub);
    RUN_TEST (test_xpub_verboser_two_subs);

337
    return UNITY_END ();
338
}