test_radio_dish.cpp 14.9 KB
Newer Older
somdoron's avatar
somdoron committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
somdoron's avatar
somdoron committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
somdoron's avatar
somdoron committed
32

33 34 35 36 37 38 39 40 41
#include <string.h>

#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#endif

42 43 44 45 46 47 48 49 50 51 52 53
// Helper macro to define the v4/v6 function pairs
#define MAKE_TEST_V4V6(_test)                                                  \
    static void _test##_ipv4 () { _test (false); }                             \
                                                                               \
    static void _test##_ipv6 ()                                                \
    {                                                                          \
        if (!is_ipv6_available ()) {                                           \
            TEST_IGNORE_MESSAGE ("ipv6 is not available");                     \
        }                                                                      \
        _test (true);                                                          \
    }

54
SETUP_TEARDOWN_TESTCONTEXT
somdoron's avatar
somdoron committed
55

56 57 58 59 60
void msg_send_expect_success (void *s_, const char *group_, const char *body_)
{
    zmq_msg_t msg;
    const size_t len = strlen (body_);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
somdoron's avatar
somdoron committed
61

62
    memcpy (zmq_msg_data (&msg), body_, len);
somdoron's avatar
somdoron committed
63

64
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
somdoron's avatar
somdoron committed
65

66 67 68 69 70
    int rc = zmq_msg_send (&msg, s_, 0);
    TEST_ASSERT_EQUAL_INT ((int) len, rc);

    // TODO isn't the msg closed by zmq_msg_send?
    zmq_msg_close (&msg);
somdoron's avatar
somdoron committed
71 72
}

73
void msg_recv_cmp (void *s_, const char *group_, const char *body_)
somdoron's avatar
somdoron committed
74
{
75 76 77 78 79 80 81 82 83 84 85 86
    zmq_msg_t msg;
    const size_t len = strlen (body_);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));

    int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
    TEST_ASSERT_EQUAL_INT (len, recv_rc);

    TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));

    TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);

    zmq_msg_close (&msg);
somdoron's avatar
somdoron committed
87 88
}

89
void test_leave_unjoined_fails ()
somdoron's avatar
somdoron committed
90
{
91
    void *dish = test_context_socket (ZMQ_DISH);
somdoron's avatar
somdoron committed
92

93 94
    //  Leaving a group which we didn't join
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
somdoron's avatar
somdoron committed
95

96 97
    test_context_socket_close (dish);
}
somdoron's avatar
somdoron committed
98

99 100 101
void test_join_too_long_fails ()
{
    void *dish = test_context_socket (ZMQ_DISH);
somdoron's avatar
somdoron committed
102 103 104 105 106 107

    //  Joining too long group
    char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
    for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
        too_long_group[index] = 'A';
    too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
108
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
somdoron's avatar
somdoron committed
109

110 111 112 113 114 115 116 117
    test_context_socket_close (dish);
}

void test_join_twice_fails ()
{
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
somdoron's avatar
somdoron committed
118 119

    // Duplicate Joining
120 121 122 123 124
    TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));

    test_context_socket_close (dish);
}

125
void test_radio_dish_tcp_poll (int ipv6_)
126 127 128 129 130
{
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];

    void *radio = test_context_socket (ZMQ_RADIO);
131
    bind_loopback (radio, ipv6_, my_endpoint, len);
132 133 134

    void *dish = test_context_socket (ZMQ_DISH);

135 136 137
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

138 139
    // Joining
    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
somdoron's avatar
somdoron committed
140 141

    // Connecting
142
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
somdoron's avatar
somdoron committed
143

144
    msleep (SETTLE_TIME);
somdoron's avatar
somdoron committed
145

somdoron's avatar
somdoron committed
146
    //  This is not going to be sent as dish only subscribe to "Movies"
147
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
148

somdoron's avatar
somdoron committed
149
    //  This is going to be sent to the dish
150
    msg_send_expect_success (radio, "Movies", "Godfather");
somdoron's avatar
somdoron committed
151

somdoron's avatar
somdoron committed
152
    //  Check the correct message arrived
153
    msg_recv_cmp (dish, "Movies", "Godfather");
somdoron's avatar
somdoron committed
154 155

    //  Join group during connection optvallen
156
    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
somdoron's avatar
somdoron committed
157 158 159 160

    zmq_sleep (1);

    //  This should arrive now as we joined the group
161
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
162

somdoron's avatar
somdoron committed
163
    //  Check the correct message arrived
164
    msg_recv_cmp (dish, "TV", "Friends");
somdoron's avatar
somdoron committed
165

166 167
    //  Leaving group
    TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
somdoron's avatar
somdoron committed
168 169 170

    zmq_sleep (1);

somdoron's avatar
somdoron committed
171
    //  This is not going to be sent as dish only subscribe to "Movies"
172
    msg_send_expect_success (radio, "TV", "Friends");
somdoron's avatar
somdoron committed
173 174

    //  This is going to be sent to the dish
175
    msg_send_expect_success (radio, "Movies", "Godfather");
somdoron's avatar
somdoron committed
176

177
    // test zmq_poll with dish
178 179 180
    zmq_pollitem_t items[] = {
      {radio, 0, ZMQ_POLLIN, 0}, // read publications
      {dish, 0, ZMQ_POLLIN, 0},  // read subscriptions
181
    };
182 183 184
    int rc = zmq_poll (items, 2, 2000);
    TEST_ASSERT_EQUAL_INT (1, rc);
    TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
185

somdoron's avatar
somdoron committed
186
    //  Check the correct message arrived
187
    msg_recv_cmp (dish, "Movies", "Godfather");
somdoron's avatar
somdoron committed
188

189 190 191
    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
192
MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
somdoron's avatar
somdoron committed
193

194
void test_dish_connect_fails (int ipv6_)
195 196 197
{
    void *dish = test_context_socket (ZMQ_DISH);

198 199 200 201 202
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";

203
    //  Connecting dish should fail
204
    TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
205 206 207

    test_context_socket_close (dish);
}
208
MAKE_TEST_V4V6 (test_dish_connect_fails)
209

210
void test_radio_bind_fails (int ipv6_)
211 212 213
{
    void *radio = test_context_socket (ZMQ_RADIO);

214 215 216
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));

217 218 219 220 221 222 223
    //  Connecting dish should fail
    //  Bind radio should fail
    TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
                               zmq_bind (radio, "udp://*:5556"));

    test_context_socket_close (radio);
}
224
MAKE_TEST_V4V6 (test_radio_bind_fails)
225

226
void test_radio_dish_udp (int ipv6_)
227 228 229 230
{
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

231 232 233 234 235 236 237
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";

238
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
239
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
240 241 242 243 244 245 246 247 248 249 250

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");
    msg_recv_cmp (dish, "TV", "Friends");

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
251
MAKE_TEST_V4V6 (test_radio_dish_udp)
252

253 254 255 256 257 258 259 260
#define MCAST_IPV4 "226.8.5.5"
#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"

static const char *mcast_url (int ipv6_)
{
    if (ipv6_) {
        return "udp://[" MCAST_IPV6 "]:5555";
    } else {
261
        return "udp://" MCAST_IPV4 ":5555";
262 263 264 265 266 267 268 269
    }
}

//  OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif

270 271 272 273 274 275 276
union sa_u
{
    struct sockaddr generic;
    struct sockaddr_in ipv4;
    struct sockaddr_in6 ipv6;
};

277 278 279 280 281
//  Test if multicast is available on this machine by attempting to
//  send a receive a multicast datagram
static bool is_multicast_available (int ipv6_)
{
    int family = ipv6_ ? AF_INET6 : AF_INET;
282 283
    fd_t bind_sock = retired_fd;
    fd_t send_sock = retired_fd;
284 285 286 287
    int port = 5555;
    bool success = false;
    const char *msg = "it works";
    char buf[32];
288 289
    union sa_u any;
    union sa_u mcast;
290 291 292 293
    socklen_t sl;
    int rc;

    if (ipv6_) {
294 295
        struct sockaddr_in6 *any_ipv6 = &any.ipv6;
        struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315

        any_ipv6->sin6_family = AF_INET6;
        any_ipv6->sin6_port = htons (port);
        any_ipv6->sin6_flowinfo = 0;
        any_ipv6->sin6_scope_id = 0;

        rc = inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
        if (rc == 0) {
            goto out;
        }

        *mcast_ipv6 = *any_ipv6;

        rc = inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
        if (rc == 0) {
            goto out;
        }

        sl = sizeof (*any_ipv6);
    } else {
316 317
        struct sockaddr_in *any_ipv4 = &any.ipv4;
        struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346

        any_ipv4->sin_family = AF_INET;
        any_ipv4->sin_port = htons (5555);

        rc = inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
        if (rc == 0) {
            goto out;
        }

        *mcast_ipv4 = *any_ipv4;

        rc = inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
        if (rc == 0) {
            goto out;
        }

        sl = sizeof (*any_ipv4);
    }

    bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
    if (bind_sock < 0) {
        goto out;
    }

    send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
    if (bind_sock < 0) {
        goto out;
    }

347
    rc = bind (bind_sock, &any.generic, sl);
348 349 350 351 352 353
    if (rc < 0) {
        goto out;
    }

    if (ipv6_) {
        struct ipv6_mreq mreq;
354
        struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
355 356 357 358

        mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
        mreq.ipv6mr_interface = 0;

359 360
        rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
                         as_setsockopt_opt_t (&mreq), sizeof (mreq));
361 362 363 364 365
        if (rc < 0) {
            goto out;
        }

        int loop = 1;
366 367
        rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
                         as_setsockopt_opt_t (&loop), sizeof (loop));
368 369 370 371 372
        if (rc < 0) {
            goto out;
        }
    } else {
        struct ip_mreq mreq;
373
        struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
374 375 376 377

        mreq.imr_multiaddr = mcast_ipv4->sin_addr;
        mreq.imr_interface.s_addr = htonl (INADDR_ANY);

378 379
        rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                         as_setsockopt_opt_t (&mreq), sizeof (mreq));
380 381 382 383 384
        if (rc < 0) {
            goto out;
        }

        int loop = 1;
385 386
        rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
                         as_setsockopt_opt_t (&loop), sizeof (loop));
387 388 389 390 391 392 393
        if (rc < 0) {
            goto out;
        }
    }

    msleep (SETTLE_TIME);

394 395
    rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
                 &mcast.generic, sl);
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
    if (rc < 0) {
        goto out;
    }

    msleep (SETTLE_TIME);

    rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
    if (rc < 0) {
        goto out;
    }

    buf[rc] = '\0';

    success = (strcmp (msg, buf) == 0);

out:
    if (bind_sock >= 0) {
        close (bind_sock);
    }

    if (send_sock >= 0) {
        close (send_sock);
    }

    return success;
}

423 424 425 426 427 428 429 430
static void ignore_if_unavailable (int ipv6_)
{
    if (ipv6_ && !is_ipv6_available ())
        TEST_IGNORE_MESSAGE ("No IPV6 available");
    if (!is_multicast_available (ipv6_))
        TEST_IGNORE_MESSAGE ("No multicast available");
}

431 432
static void test_radio_dish_mcast (int ipv6_)
{
433 434
    ignore_if_unavailable (ipv6_);

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

    const char *url = mcast_url (ipv6_);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");
    msg_recv_cmp (dish, "TV", "Friends");

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_mcast)

static void test_radio_dish_no_loop (int ipv6_)
{
462 463 464 465
#ifdef _WIN32
    TEST_IGNORE_MESSAGE (
      "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)");
#endif
466 467
    ignore_if_unavailable (ipv6_);

468 469 470 471 472 473 474 475
    void *radio = test_context_socket (ZMQ_RADIO);
    void *dish = test_context_socket (ZMQ_DISH);

    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));

476
    //  Disable multicast loop for radio
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
    int loop = 0;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));

    const char *url = mcast_url (ipv6_);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));

    msleep (SETTLE_TIME);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));

    msg_send_expect_success (radio, "TV", "Friends");

    // Looping is disabled, we shouldn't receive anything
    msleep (SETTLE_TIME);

495
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
496 497 498 499 500 501

    test_context_socket_close (dish);
    test_context_socket_close (radio);
}
MAKE_TEST_V4V6 (test_radio_dish_no_loop)

502 503 504
int main (void)
{
    setup_test_environment ();
somdoron's avatar
somdoron committed
505

506 507 508 509
    UNITY_BEGIN ();
    RUN_TEST (test_leave_unjoined_fails);
    RUN_TEST (test_join_too_long_fails);
    RUN_TEST (test_join_twice_fails);
510 511 512 513 514 515 516 517
    RUN_TEST (test_radio_bind_fails_ipv4);
    RUN_TEST (test_radio_bind_fails_ipv6);
    RUN_TEST (test_dish_connect_fails_ipv4);
    RUN_TEST (test_dish_connect_fails_ipv6);
    RUN_TEST (test_radio_dish_tcp_poll_ipv4);
    RUN_TEST (test_radio_dish_tcp_poll_ipv6);
    RUN_TEST (test_radio_dish_udp_ipv4);
    RUN_TEST (test_radio_dish_udp_ipv6);
518

519 520
    RUN_TEST (test_radio_dish_mcast_ipv4);
    RUN_TEST (test_radio_dish_no_loop_ipv4);
521

522 523
    RUN_TEST (test_radio_dish_mcast_ipv6);
    RUN_TEST (test_radio_dish_no_loop_ipv6);
524

525
    return UNITY_END ();
somdoron's avatar
somdoron committed
526
}