udp_engine.cpp 17.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file

This file is part of libzmq, the ZeroMQ core engine in C++.

libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.

As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.

libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "precompiled.hpp"
31

32
#if !defined ZMQ_HAVE_WINDOWS
33
#include <sys/types.h>
34
#include <unistd.h>
35 36 37
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
38 39 40
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
41 42
#endif

43
#include "udp_address.hpp"
44 45 46 47 48
#include "udp_engine.hpp"
#include "session_base.hpp"
#include "err.hpp"
#include "ip.hpp"

49 50 51 52 53
//  OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif

54 55 56 57
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif

58
zmq::udp_engine_t::udp_engine_t (const options_t &options_) :
59 60 61 62 63 64 65 66
    _plugged (false),
    _fd (-1),
    _session (NULL),
    _handle (static_cast<handle_t> (NULL)),
    _address (NULL),
    _options (options_),
    _send_enabled (false),
    _recv_enabled (false)
67 68 69
{
}

70
zmq::udp_engine_t::~udp_engine_t ()
71
{
72
    zmq_assert (!_plugged);
73

74
    if (_fd != retired_fd) {
75
#ifdef ZMQ_HAVE_WINDOWS
76
        const int rc = closesocket (_fd);
77 78
        wsa_assert (rc != SOCKET_ERROR);
#else
79
        int rc = close (_fd);
80 81
        errno_assert (rc == 0);
#endif
82
        _fd = retired_fd;
83 84 85
    }
}

86
int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
87 88 89
{
    zmq_assert (address_);
    zmq_assert (send_ || recv_);
90 91 92
    _send_enabled = send_;
    _recv_enabled = recv_;
    _address = address_;
93

94 95 96
    _fd = open_socket (_address->resolved.udp_addr->family (), SOCK_DGRAM,
                       IPPROTO_UDP);
    if (_fd == retired_fd)
97 98
        return -1;

99
    unblock_socket (_fd);
100 101 102 103

    return 0;
}

104
void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
105
{
106 107
    zmq_assert (!_plugged);
    _plugged = true;
108

109
    zmq_assert (!_session);
110
    zmq_assert (session_);
111
    _session = session_;
112 113 114

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
115
    _handle = add_fd (_fd);
116

117
    const udp_address_t *const udp_addr = _address->resolved.udp_addr;
118

119 120
    int rc = 0;

121
    // Bind the socket to a device if applicable
122
    if (!_options.bound_device.empty ()) {
123
        rc = rc | bind_to_device (_fd, _options.bound_device);
124 125 126 127 128 129
        if (rc != 0) {
            assert_success_or_recoverable (_fd, rc);
            error (connection_error);
            return;
        }
    }
130

131 132
    if (_send_enabled) {
        if (!_options.raw_socket) {
133
            const ip_addr_t *out = udp_addr->target_addr ();
134 135
            _out_address = out->as_sockaddr ();
            _out_address_len = out->sockaddr_len ();
136 137

            if (out->is_multicast ()) {
138
                const bool is_ipv6 = (out->family () == AF_INET6);
139 140 141 142 143 144 145 146
                rc = rc
                     | set_udp_multicast_loop (_fd, is_ipv6,
                                               _options.multicast_loop);

                if (_options.multicast_hops > 0) {
                    rc = rc
                         | set_udp_multicast_ttl (_fd, is_ipv6,
                                                  _options.multicast_hops);
147 148
                }

149
                rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
150
            }
151
        } else {
152
            /// XXX fixme ?
153
            _out_address = reinterpret_cast<sockaddr *> (&_raw_address);
154 155
            _out_address_len =
              static_cast<zmq_socklen_t> (sizeof (sockaddr_in));
156 157
        }
    }
158

159
    if (_recv_enabled) {
160
        rc = rc | set_udp_reuse_address (_fd, true);
161

162
        const ip_addr_t *bind_addr = udp_addr->bind_addr ();
163 164 165
        ip_addr_t any = ip_addr_t::any (bind_addr->family ());
        const ip_addr_t *real_bind_addr;

166
        const bool multicast = udp_addr->is_mcast ();
167 168

        if (multicast) {
169 170
            //  Multicast addresses should be allowed to bind to more than
            //  one port as all ports should receive the message
171
            rc = rc | set_udp_reuse_port (_fd, true);
172

173 174 175
            //  In multicast we should bind ANY and use the mreq struct to
            //  specify the interface
            any.set_port (bind_addr->port ());
176

177 178 179 180 181
            real_bind_addr = &any;
        } else {
            real_bind_addr = bind_addr;
        }

182 183 184 185 186
        if (rc != 0) {
            error (protocol_error);
            return;
        }

187
#ifdef ZMQ_HAVE_VXWORKS
188 189 190
        rc = rc
             | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
                     real_bind_addr->sockaddr_len ());
191
#else
192 193 194
        rc = rc
             | bind (_fd, real_bind_addr->as_sockaddr (),
                     real_bind_addr->sockaddr_len ());
195
#endif
196 197 198 199 200
        if (rc != 0) {
            assert_success_or_recoverable (_fd, rc);
            error (connection_error);
            return;
        }
201

202
        if (multicast) {
203 204 205
            rc = rc | add_membership (_fd, udp_addr);
        }
    }
206

207 208 209 210 211 212
    if (rc != 0) {
        error (protocol_error);
    } else {
        if (_send_enabled) {
            set_pollout (_handle);
        }
213

214 215
        if (_recv_enabled) {
            set_pollin (_handle);
216

217 218 219 220 221
            //  Call restart output to drop all join/leave commands
            restart_output ();
        }
    }
}
222

223 224 225 226 227 228 229 230 231 232 233 234 235 236
int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
                                               bool is_ipv6_,
                                               bool loop_)
{
    int level;
    int optname;

    if (is_ipv6_) {
        level = IPPROTO_IPV6;
        optname = IPV6_MULTICAST_LOOP;
    } else {
        level = IPPROTO_IP;
        optname = IP_MULTICAST_LOOP;
    }
237

238
    int loop = loop_ ? 1 : 0;
239 240
    const int rc = setsockopt (s_, level, optname,
                               reinterpret_cast<char *> (&loop), sizeof (loop));
241
    assert_success_or_recoverable (s_, rc);
242 243
    return rc;
}
244

245 246 247
int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)
{
    int level;
248

249 250 251 252 253
    if (is_ipv6_) {
        level = IPPROTO_IPV6;
    } else {
        level = IPPROTO_IP;
    }
254

255 256 257
    const int rc =
      setsockopt (s_, level, IP_MULTICAST_TTL,
                  reinterpret_cast<char *> (&hops_), sizeof (hops_));
258
    assert_success_or_recoverable (s_, rc);
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
    return rc;
}

int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
                                                bool is_ipv6_,
                                                const udp_address_t *addr_)
{
    int rc = 0;

    if (is_ipv6_) {
        int bind_if = addr_->bind_if ();

        if (bind_if > 0) {
            //  If a bind interface is provided we tell the
            //  kernel to use it to send multicast packets
            rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
                             reinterpret_cast<char *> (&bind_if),
                             sizeof (bind_if));
        }
    } else {
        struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr;

        if (bind_addr.s_addr != INADDR_ANY) {
            rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF,
                             reinterpret_cast<char *> (&bind_addr),
                             sizeof (bind_addr));
        }
    }

288
    assert_success_or_recoverable (s_, rc);
289 290 291 292 293 294
    return rc;
}

int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
{
    int on = on_ ? 1 : 0;
295 296
    const int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
                               reinterpret_cast<char *> (&on), sizeof (on));
297
    assert_success_or_recoverable (s_, rc);
298 299 300 301 302 303 304
    return rc;
}

int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
{
#ifndef SO_REUSEPORT
    return 0;
305
#else
306 307 308
    int on = on_ ? 1 : 0;
    int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
                         reinterpret_cast<char *> (&on), sizeof (on));
309
    assert_success_or_recoverable (s_, rc);
310
    return rc;
311
#endif
312 313 314 315 316 317
}

int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
{
    const ip_addr_t *mcast_addr = addr_->target_addr ();
    int rc = 0;
318

319 320 321 322 323 324 325 326 327 328
    if (mcast_addr->family () == AF_INET) {
        struct ip_mreq mreq;
        mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
        mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr;

        rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                         reinterpret_cast<char *> (&mreq), sizeof (mreq));

    } else if (mcast_addr->family () == AF_INET6) {
        struct ipv6_mreq mreq;
329
        const int iface = addr_->bind_if ();
330 331 332 333 334 335 336 337

        zmq_assert (iface >= -1);

        mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
        mreq.ipv6mr_interface = iface;

        rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
                         reinterpret_cast<char *> (&mreq), sizeof (mreq));
338
    }
339

340
    assert_success_or_recoverable (s_, rc);
341 342 343 344 345 346 347 348
    return rc;
}

void zmq::udp_engine_t::error (error_reason_t reason_)
{
    zmq_assert (_session);
    _session->engine_error (reason_);
    terminate ();
349 350
}

351
void zmq::udp_engine_t::terminate ()
352
{
353 354
    zmq_assert (_plugged);
    _plugged = false;
355

356
    rm_fd (_handle);
357 358 359 360 361 362 363

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    delete this;
}

364 365
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_,
                                         const sockaddr_in *addr_)
366
{
367
    const char *const name = inet_ntoa (addr_->sin_addr);
368 369

    char port[6];
370 371 372
    const int port_len =
      sprintf (port, "%d", static_cast<int> (ntohs (addr_->sin_port)));
    zmq_assert (port_len > 0);
373

374 375 376
    const size_t name_len = strlen (name);
    const int size = static_cast<int> (name_len) + 1 /* colon */
                     + port_len + 1;                 //  terminating NUL
377
    const int rc = msg_->init_size (size);
378
    errno_assert (rc == 0);
379
    msg_->set_flags (msg_t::more);
380

381 382 383 384 385 386 387 388 389
    //  use memcpy instead of strcpy/strcat, since this is more efficient when
    //  we already know the lengths, which we calculated above
    char *address = static_cast<char *> (msg_->data ());
    memcpy (address, name, name_len);
    address += name_len;
    *address++ = ':';
    memcpy (address, port, static_cast<size_t> (port_len));
    address += port_len;
    *address = 0;
390 391
}

392
int zmq::udp_engine_t::resolve_raw_address (const char *name_, size_t length_)
393
{
394
    memset (&_raw_address, 0, sizeof _raw_address);
395 396 397 398 399

    const char *delimiter = NULL;

    // Find delimiter, cannot use memrchr as it is not supported on windows
    if (length_ != 0) {
400
        int chars_left = static_cast<int> (length_);
401
        const char *current_char = name_ + length_;
402 403 404 405 406 407 408 409
        do {
            if (*(--current_char) == ':') {
                delimiter = current_char;
                break;
            }
        } while (--chars_left != 0);
    }

410 411 412 413 414
    if (!delimiter) {
        errno = EINVAL;
        return -1;
    }

415 416
    const std::string addr_str (name_, delimiter - name_);
    const std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
417 418

    //  Parse the port number (0 is not a valid port).
419
    const uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ()));
420 421 422 423 424
    if (port == 0) {
        errno = EINVAL;
        return -1;
    }

425 426 427
    _raw_address.sin_family = AF_INET;
    _raw_address.sin_port = htons (port);
    _raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
428

429
    if (_raw_address.sin_addr.s_addr == INADDR_NONE) {
430 431 432 433 434 435 436
        errno = EINVAL;
        return -1;
    }

    return 0;
}

437
void zmq::udp_engine_t::out_event ()
438 439
{
    msg_t group_msg;
440
    int rc = _session->pull_msg (&group_msg);
441 442 443 444
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));

    if (rc == 0) {
        msg_t body_msg;
445
        rc = _session->pull_msg (&body_msg);
446 447
        //  If there's a group, there should also be a body
        errno_assert (rc == 0);
448

449 450
        const size_t group_size = group_msg.size ();
        const size_t body_size = body_msg.size ();
451
        size_t size;
452

453
        if (_options.raw_socket) {
454 455
            rc = resolve_raw_address (static_cast<char *> (group_msg.data ()),
                                      group_size);
456 457 458 459 460 461

            //  We discard the message if address is not valid
            if (rc != 0) {
                rc = group_msg.close ();
                errno_assert (rc == 0);

462
                rc = body_msg.close ();
463 464 465
                errno_assert (rc == 0);

                return;
466
            }
467 468 469

            size = body_size;

470
            memcpy (_out_buffer, body_msg.data (), body_size);
471
        } else {
472 473
            size = group_size + body_size + 1;

474
            // TODO: check if larger than maximum size
475 476 477
            _out_buffer[0] = static_cast<unsigned char> (group_size);
            memcpy (_out_buffer + 1, group_msg.data (), group_size);
            memcpy (_out_buffer + 1 + group_size, body_msg.data (), body_size);
478
        }
479 480 481 482 483 484 485

        rc = group_msg.close ();
        errno_assert (rc == 0);

        body_msg.close ();
        errno_assert (rc == 0);

486
#ifdef ZMQ_HAVE_WINDOWS
487
        rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
488
                     _out_address_len);
489
#elif defined ZMQ_HAVE_VXWORKS
490
        rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
491
                     (sockaddr *) _out_address, _out_address_len);
492
#else
493
        rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
494
#endif
495 496 497 498 499 500 501 502 503 504 505 506 507 508
        if (rc < 0) {
#ifdef ZMQ_HAVE_WINDOWS
            if (WSAGetLastError () != WSAEWOULDBLOCK) {
                assert_success_or_recoverable (_fd, rc);
                error (connection_error);
            }
#else
            if (rc != EWOULDBLOCK) {
                assert_success_or_recoverable (_fd, rc);
                error (connection_error);
            }
#endif
        }
    } else {
509
        reset_pollout (_handle);
510
    }
511 512
}

513
const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
514
{
515
    return _empty_endpoint;
516 517
}

518
void zmq::udp_engine_t::restart_output ()
519 520
{
    //  If we don't support send we just drop all messages
521
    if (!_send_enabled) {
522
        msg_t msg;
523
        while (_session->pull_msg (&msg) == 0)
524
            msg.close ();
525
    } else {
526
        set_pollout (_handle);
527 528 529 530
        out_event ();
    }
}

531
void zmq::udp_engine_t::in_event ()
532
{
533
    sockaddr_storage in_address;
534 535 536 537
    zmq_socklen_t in_addrlen =
      static_cast<zmq_socklen_t> (sizeof (sockaddr_storage));

    const int nbytes =
538
      recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
539
                reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
540 541

    if (nbytes < 0) {
542
#ifdef ZMQ_HAVE_WINDOWS
543 544 545 546
        if (WSAGetLastError () != WSAEWOULDBLOCK) {
            assert_success_or_recoverable (_fd, nbytes);
            error (connection_error);
        }
547
#else
548 549 550 551 552
        if (nbytes != EWOULDBLOCK) {
            assert_success_or_recoverable (_fd, nbytes);
            error (connection_error);
        }
#endif
553 554
        return;
    }
555

556
    int rc;
557
    int body_size;
558
    int body_offset;
559
    msg_t msg;
560

561
    if (_options.raw_socket) {
562 563
        zmq_assert (in_address.ss_family == AF_INET);
        sockaddr_to_msg (&msg, reinterpret_cast<sockaddr_in *> (&in_address));
564 565 566

        body_size = nbytes;
        body_offset = 0;
567
    } else {
568 569 570 571
        // TODO in out_event, the group size is an *unsigned* char. what is
        // the maximum value?
        const char *group_buffer = _in_buffer + 1;
        const int group_size = _in_buffer[0];
572 573 574 575 576

        rc = msg.init_size (group_size);
        errno_assert (rc == 0);
        msg.set_flags (msg_t::more);
        memcpy (msg.data (), group_buffer, group_size);
577

578 579 580
        //  This doesn't fit, just ingore
        if (nbytes - 1 < group_size)
            return;
581

582
        body_size = nbytes - 1 - group_size;
583
        body_offset = 1 + group_size;
584
    }
585
    // Push group description to session
586
    rc = _session->push_msg (&msg);
587
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
588

589
    //  Group description message doesn't fit in the pipe, drop
590
    if (rc != 0) {
591 592
        rc = msg.close ();
        errno_assert (rc == 0);
593

594
        reset_pollin (_handle);
595
        return;
596
    }
597 598 599 600 601

    rc = msg.close ();
    errno_assert (rc == 0);
    rc = msg.init_size (body_size);
    errno_assert (rc == 0);
602
    memcpy (msg.data (), _in_buffer + body_offset, body_size);
603 604

    // Push message body to session
605
    rc = _session->push_msg (&msg);
606
    // Message body doesn't fit in the pipe, drop and reset session state
607 608 609 610
    if (rc != 0) {
        rc = msg.close ();
        errno_assert (rc == 0);

611 612
        _session->reset ();
        reset_pollin (_handle);
613 614 615
        return;
    }

616 617
    rc = msg.close ();
    errno_assert (rc == 0);
618
    _session->flush ();
619 620
}

621
bool zmq::udp_engine_t::restart_input ()
622
{
623 624 625 626
    if (_recv_enabled) {
        set_pollin (_handle);
        in_event ();
    }
627

628
    return true;
629
}