test_radio_dish.cpp 14.7 KB
Newer Older
somdoron's avatar
somdoron committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
somdoron's avatar
somdoron committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
somdoron's avatar
somdoron committed
32

33 34
#include <unity.h>

35 36 37 38 39 40 41 42 43 44 45 46
// Helper macro to define the v4/v6 function pairs
#define MAKE_TEST_V4V6(_test)                                                  \
    static void _test##_ipv4 () { _test (false); }                             \
                                                                               \
    static void _test##_ipv6 ()                                                \
    {                                                                          \
        if (!is_ipv6_available ()) {                                           \
            TEST_IGNORE_MESSAGE ("ipv6 is not available");                     \
        }                                                                      \
        _test (true);                                                          \
    }

47
void setUp ()
somdoron's avatar
somdoron committed
48
{
49 50
    setup_test_context ();
}
somdoron's avatar
somdoron committed
51

52 53 54 55
void tearDown ()
{
    teardown_test_context ();
}
somdoron's avatar
somdoron committed
56

57 58 59 60 61
void msg_send_expect_success (void *s_, const char *group_, const char *body_)
{
    zmq_msg_t msg;
    const size_t len = strlen (body_);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
somdoron's avatar
somdoron committed
62

63
    memcpy (zmq_msg_data (&msg), body_, len);
somdoron's avatar
somdoron committed
64

65
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
somdoron's avatar
somdoron committed
66

67 68 69 70 71
    int rc = zmq_msg_send (&msg, s_, 0);
    TEST_ASSERT_EQUAL_INT ((int) len, rc);

    // TODO isn't the msg closed by zmq_msg_send?
    zmq_msg_close (&msg);
somdoron's avatar
somdoron committed
72 73
}

74
void msg_recv_cmp (void *s_, const char *group_, const char *body_)
somdoron's avatar
somdoron committed
75
{
76 77 78 79 80 81 82 83 84 85 86 87
    zmq_msg_t msg;
    const size_t len = strlen (body_);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));

    int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
    TEST_ASSERT_EQUAL_INT (len, recv_rc);

    TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));

    TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);

    zmq_msg_close (&msg);
somdoron's avatar
somdoron committed
88 89
}

90
void test_leave_unjoined_fails ()
somdoron's avatar
somdoron committed
91
{
92
    void *dish = test_context_socket (ZMQ_DISH);
somdoron's avatar
somdoron committed
93

94 95
    //  Leaving a group which we didn't join
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
somdoron's avatar
somdoron committed
96

97 98
    test_context_socket_close (dish);
}
somdoron's avatar
somdoron committed
99

100 101 102
void test_join_too_long_fails ()
{
    void *dish = test_context_socket (ZMQ_DISH);
somdoron's avatar
somdoron committed
103 104 105 106 107 108

    //  Joining too long group
    char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
    for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
        too_long_group[index] = 'A';
    too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
109
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
somdoron's avatar
somdoron committed
110

111 112 113 114 115 116 117 118
    test_context_socket_close (dish);
}

void test_join_twice_fails ()
{
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
somdoron's avatar
somdoron committed
119 120

    // Duplicate Joining
121 122 123 124 125
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));

    test_context_socket_close (dish);
}

126
void test_radio_dish_tcp_poll (int ipv6_)
127 128 129 130 131
{
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];

    void *radio = test_context_socket (ZMQ_RADIO);
132
    bind_loopback (radio, ipv6_, my_endpoint, len);
133 134 135

    void *dish = test_context_socket (ZMQ_DISH);

136 137 138
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

139 140
    // Joining
    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
somdoron's avatar
somdoron committed
141 142

    // Connecting
143
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
somdoron's avatar
somdoron committed
144

145
    msleep (SETTLE_TIME);
somdoron's avatar
somdoron committed
146

somdoron's avatar
somdoron committed
147
    //  This is not going to be sent as dish only subscribe to "Movies"
148
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
149

somdoron's avatar
somdoron committed
150
    //  This is going to be sent to the dish
151
    msg_send_expect_success (radio, "Movies", "Godfather");
somdoron's avatar
somdoron committed
152

somdoron's avatar
somdoron committed
153
    //  Check the correct message arrived
154
    msg_recv_cmp (dish, "Movies", "Godfather");
somdoron's avatar
somdoron committed
155 156

    //  Join group during connection optvallen
157
    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
somdoron's avatar
somdoron committed
158 159 160 161

    zmq_sleep (1);

    //  This should arrive now as we joined the group
162
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
163

somdoron's avatar
somdoron committed
164
    //  Check the correct message arrived
165
    msg_recv_cmp (dish, "TV", "Friends");
somdoron's avatar
somdoron committed
166

167 168
    //  Leaving group
    TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
somdoron's avatar
somdoron committed
169 170 171

    zmq_sleep (1);

somdoron's avatar
somdoron committed
172
    //  This is not going to be sent as dish only subscribe to "Movies"
173
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
174 175

    //  This is going to be sent to the dish
176
    msg_send_expect_success (radio, "Movies", "Godfather");
somdoron's avatar
somdoron committed
177

178
    // test zmq_poll with dish
179 180 181
    zmq_pollitem_t items[] = {
      {radio, 0, ZMQ_POLLIN, 0}, // read publications
      {dish, 0, ZMQ_POLLIN, 0},  // read subscriptions
182
    };
183 184 185
    int rc = zmq_poll (items, 2, 2000);
    TEST_ASSERT_EQUAL_INT (1, rc);
    TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
186

somdoron's avatar
somdoron committed
187
    //  Check the correct message arrived
188
    msg_recv_cmp (dish, "Movies", "Godfather");
somdoron's avatar
somdoron committed
189

190 191 192
    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
193
MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
somdoron's avatar
somdoron committed
194

195
void test_dish_connect_fails (int ipv6_)
196 197 198
{
    void *dish = test_context_socket (ZMQ_DISH);

199 200 201 202 203
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";

204
    //  Connecting dish should fail
205
    TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
206 207 208

    test_context_socket_close (dish);
}
209
MAKE_TEST_V4V6 (test_dish_connect_fails)
210

211
void test_radio_bind_fails (int ipv6_)
212 213 214
{
    void *radio = test_context_socket (ZMQ_RADIO);

215 216 217
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));

218 219 220 221 222 223 224
    //  Connecting dish should fail
    //  Bind radio should fail
    TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
                               zmq_bind (radio, "udp://*:5556"));

    test_context_socket_close (radio);
}
225
MAKE_TEST_V4V6 (test_radio_bind_fails)
226

227
void test_radio_dish_udp (int ipv6_)
228 229 230 231
{
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

232 233 234 235 236 237 238
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";

239
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
240
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
241 242 243 244 245 246 247 248 249 250 251

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");
    msg_recv_cmp (dish, "TV", "Friends");

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
252
MAKE_TEST_V4V6 (test_radio_dish_udp)
253

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
#define MCAST_IPV4 "226.8.5.5"
#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"

static const char *mcast_url (int ipv6_)
{
    if (ipv6_) {
        return "udp://[" MCAST_IPV6 "]:5555";
    } else {
        return "udp://[" MCAST_IPV4 "]:5555";
    }
}

//  OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif

271 272 273 274 275 276 277
union sa_u
{
    struct sockaddr generic;
    struct sockaddr_in ipv4;
    struct sockaddr_in6 ipv6;
};

278 279 280 281 282
//  Test if multicast is available on this machine by attempting to
//  send a receive a multicast datagram
static bool is_multicast_available (int ipv6_)
{
    int family = ipv6_ ? AF_INET6 : AF_INET;
283 284
    fd_t bind_sock = retired_fd;
    fd_t send_sock = retired_fd;
285 286 287 288
    int port = 5555;
    bool success = false;
    const char *msg = "it works";
    char buf[32];
289 290
    union sa_u any;
    union sa_u mcast;
291 292 293 294
    socklen_t sl;
    int rc;

    if (ipv6_) {
295 296
        struct sockaddr_in6 *any_ipv6 = &any.ipv6;
        struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316

        any_ipv6->sin6_family = AF_INET6;
        any_ipv6->sin6_port = htons (port);
        any_ipv6->sin6_flowinfo = 0;
        any_ipv6->sin6_scope_id = 0;

        rc = inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
        if (rc == 0) {
            goto out;
        }

        *mcast_ipv6 = *any_ipv6;

        rc = inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
        if (rc == 0) {
            goto out;
        }

        sl = sizeof (*any_ipv6);
    } else {
317 318
        struct sockaddr_in *any_ipv4 = &any.ipv4;
        struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

        any_ipv4->sin_family = AF_INET;
        any_ipv4->sin_port = htons (5555);

        rc = inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
        if (rc == 0) {
            goto out;
        }

        *mcast_ipv4 = *any_ipv4;

        rc = inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
        if (rc == 0) {
            goto out;
        }

        sl = sizeof (*any_ipv4);
    }

    bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
    if (bind_sock < 0) {
        goto out;
    }

    send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
    if (bind_sock < 0) {
        goto out;
    }

348
    rc = bind (bind_sock, &any.generic, sl);
349 350 351 352 353 354
    if (rc < 0) {
        goto out;
    }

    if (ipv6_) {
        struct ipv6_mreq mreq;
355
        struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
356 357 358 359

        mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
        mreq.ipv6mr_interface = 0;

360 361
        rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
                         as_setsockopt_opt_t (&mreq), sizeof (mreq));
362 363 364 365 366
        if (rc < 0) {
            goto out;
        }

        int loop = 1;
367 368
        rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
                         as_setsockopt_opt_t (&loop), sizeof (loop));
369 370 371 372 373
        if (rc < 0) {
            goto out;
        }
    } else {
        struct ip_mreq mreq;
374
        struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
375 376 377 378

        mreq.imr_multiaddr = mcast_ipv4->sin_addr;
        mreq.imr_interface.s_addr = htonl (INADDR_ANY);

379 380
        rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                         as_setsockopt_opt_t (&mreq), sizeof (mreq));
381 382 383 384 385
        if (rc < 0) {
            goto out;
        }

        int loop = 1;
386 387
        rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
                         as_setsockopt_opt_t (&loop), sizeof (loop));
388 389 390 391 392 393 394
        if (rc < 0) {
            goto out;
        }
    }

    msleep (SETTLE_TIME);

395 396
    rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
                 &mcast.generic, sl);
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    if (rc < 0) {
        goto out;
    }

    msleep (SETTLE_TIME);

    rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
    if (rc < 0) {
        goto out;
    }

    buf[rc] = '\0';

    success = (strcmp (msg, buf) == 0);

out:
    if (bind_sock >= 0) {
        close (bind_sock);
    }

    if (send_sock >= 0) {
        close (send_sock);
    }

    return success;
}

static void test_radio_dish_mcast (int ipv6_)
{
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *url = mcast_url (ipv6_);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");
    msg_recv_cmp (dish, "TV", "Friends");

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_mcast)

static void test_radio_dish_no_loop (int ipv6_)
{
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    //  Disable multicast loop
    int loop = 0;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));

    const char *url = mcast_url (ipv6_);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");

    // Looping is disabled, we shouldn't receive anything
    msleep (SETTLE_TIME);

480
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
481 482 483 484 485 486

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_no_loop)

487 488 489
int main (void)
{
    setup_test_environment ();
somdoron's avatar
somdoron committed
490

491 492 493 494
    UNITY_BEGIN ();
    RUN_TEST (test_leave_unjoined_fails);
    RUN_TEST (test_join_too_long_fails);
    RUN_TEST (test_join_twice_fails);
495 496 497 498 499 500 501 502
    RUN_TEST (test_radio_bind_fails_ipv4);
    RUN_TEST (test_radio_bind_fails_ipv6);
    RUN_TEST (test_dish_connect_fails_ipv4);
    RUN_TEST (test_dish_connect_fails_ipv6);
    RUN_TEST (test_radio_dish_tcp_poll_ipv4);
    RUN_TEST (test_radio_dish_tcp_poll_ipv6);
    RUN_TEST (test_radio_dish_udp_ipv4);
    RUN_TEST (test_radio_dish_udp_ipv6);
503

504 505 506 507 508 509 510 511 512 513 514 515 516
    bool ipv4_mcast = is_multicast_available (false);
    bool ipv6_mcast = is_ipv6_available () && is_multicast_available (true);

    if (ipv4_mcast) {
        RUN_TEST (test_radio_dish_mcast_ipv4);
        RUN_TEST (test_radio_dish_no_loop_ipv4);
    }

    if (ipv6_mcast) {
        RUN_TEST (test_radio_dish_mcast_ipv6);
        RUN_TEST (test_radio_dish_no_loop_ipv6);
    }

517
    return UNITY_END ();
somdoron's avatar
somdoron committed
518
}