udp_engine.cpp 10.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file

This file is part of libzmq, the ZeroMQ core engine in C++.

libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.

As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.

libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "precompiled.hpp"
31

32
#if !defined ZMQ_HAVE_WINDOWS
33
#include <sys/types.h>
34
#include <unistd.h>
35 36 37 38 39 40 41 42 43 44 45
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif

#include "udp_engine.hpp"
#include "session_base.hpp"
#include "v2_protocol.hpp"
#include "err.hpp"
#include "ip.hpp"

46
zmq::udp_engine_t::udp_engine_t(const options_t &options_) :
47
    plugged (false),
48
    fd(-1),
49
    session(NULL),
50
    handle((handle_t)NULL),
51
    address(NULL),
52
    options(options_),
53
    send_enabled(false),
54
    recv_enabled(false)
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
{
}

zmq::udp_engine_t::~udp_engine_t()
{
    zmq_assert (!plugged);

    if (fd != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (fd);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = close (fd);
        errno_assert (rc == 0);
#endif
        fd = retired_fd;
    }
}

74
int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
{
    zmq_assert (address_);
    zmq_assert (send_ || recv_);
    send_enabled = send_;
    recv_enabled = recv_;
    address = address_;

    fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
    if (fd == retired_fd)
        return -1;

    unblock_socket (fd);

    return 0;
}

void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
{
    zmq_assert (!plugged);
    plugged = true;

    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (fd);

104 105 106 107 108 109 110 111 112 113
    if (send_enabled) {
        if (!options.raw_socket) {
            out_address = address->resolved.udp_addr->dest_addr ();
            out_addrlen = address->resolved.udp_addr->dest_addrlen ();
        }
        else {
            out_address = (sockaddr *) &raw_address;
            out_addrlen = sizeof (sockaddr_in);
        }

114
        set_pollout (handle);
115
    }
116 117 118 119

    if (recv_enabled) {
        int on = 1;
        int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
120 121 122 123 124 125 126 127 128 129 130 131 132
#ifdef ZMQ_HAVE_WINDOWS
        wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif

        rc = bind (fd, address->resolved.udp_addr->bind_addr (),
                       address->resolved.udp_addr->bind_addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
        wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif
133 134 135 136 137

        if (address->resolved.udp_addr->is_mcast ()) {
            struct ip_mreq mreq;
            mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
            mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
138
            rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
#ifdef ZMQ_HAVE_WINDOWS
            wsa_assert (rc != SOCKET_ERROR);
#else
            errno_assert (rc == 0);
#endif
        }
        set_pollin (handle);

        //  Call restart output to drop all join/leave commands
        restart_output ();
    }
}

void zmq::udp_engine_t::terminate()
{
    zmq_assert (plugged);
    plugged = false;

    rm_fd (handle);

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    delete this;
}

165 166 167 168 169
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in* addr)
{
    char* name = inet_ntoa(addr->sin_addr);

    char port[6];
170
    sprintf (port, "%d", (int) ntohs (addr->sin_port));
171

172
    int size = (int) strlen (name) + (int) strlen (port) + 1 + 1; //  Colon + NULL
173 174 175 176 177 178 179 180 181 182
    int rc = msg->init_size (size);
    errno_assert (rc == 0);
    msg->set_flags (msg_t::more);
    char *address = (char*)msg->data ();

    strcpy (address, name);
    strcat (address, ":");
    strcat (address, port);
}

183
int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
184
{
185 186 187 188 189 190
    memset (&raw_address, 0, sizeof raw_address);

    const char *delimiter = NULL;

    // Find delimiter, cannot use memrchr as it is not supported on windows
    if (length_ != 0) {
191
        int chars_left = (int) length_;
192 193 194 195 196 197 198 199 200
        char *current_char = name_ + length_;
        do {
            if (*(--current_char) == ':') {
                delimiter = current_char;
                break;
            }
        } while (--chars_left != 0);
    }

201 202 203 204 205 206
    if (!delimiter) {
        errno = EINVAL;
        return -1;
    }

    std::string addr_str (name_, delimiter - name_);
207
    std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227

    //  Parse the port number (0 is not a valid port).
    uint16_t port = (uint16_t) atoi (port_str.c_str ());
    if (port == 0) {
        errno = EINVAL;
        return -1;
    }

    raw_address.sin_family = AF_INET;
    raw_address.sin_port = htons (port);
    raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());

    if (raw_address.sin_addr.s_addr == INADDR_NONE) {
        errno = EINVAL;
        return -1;
    }

    return 0;
}

228 229 230 231 232 233 234 235 236 237 238 239
void zmq::udp_engine_t::out_event()
{
    msg_t group_msg;
    int rc = session->pull_msg (&group_msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));

    if (rc == 0) {
        msg_t body_msg;
        rc = session->pull_msg (&body_msg);

        size_t group_size = group_msg.size ();
        size_t body_size = body_msg.size ();
240
        size_t size;
241

242
        if (options.raw_socket) {
243 244 245 246 247 248 249 250 251 252 253
            rc = resolve_raw_address ((char*) group_msg.data(), group_size);

            //  We discard the message if address is not valid
            if (rc != 0) {
                rc = group_msg.close ();
                errno_assert (rc == 0);

                body_msg.close ();
                errno_assert (rc == 0);

                return;
254
            }
255 256 257

            size = body_size;

258 259 260
            memcpy (out_buffer, body_msg.data (), body_size);
        }
        else {
261 262
            size = group_size + body_size + 1;

263 264 265 266 267
            // TODO: check if larger than maximum size
            out_buffer[0] = (unsigned char) group_size;
            memcpy (out_buffer + 1, group_msg.data (), group_size);
            memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
        }
268 269 270 271 272 273 274

        rc = group_msg.close ();
        errno_assert (rc == 0);

        body_msg.close ();
        errno_assert (rc == 0);

275
#ifdef ZMQ_HAVE_WINDOWS
276
        rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
277
            out_address, (int) out_addrlen);
278
        wsa_assert (rc != SOCKET_ERROR);
279
#else
280
        rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen);
281
        errno_assert (rc != -1);
282
#endif
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    }
    else
       reset_pollout (handle);
}

void zmq::udp_engine_t::restart_output()
{
    //  If we don't support send we just drop all messages
    if (!send_enabled) {
        msg_t msg;
        while (session->pull_msg (&msg) == 0)
            msg.close ();
    }
    else {
        set_pollout(handle);
        out_event ();
    }
}

void zmq::udp_engine_t::in_event()
{
304
  struct sockaddr_in in_address;
305
  socklen_t in_addrlen = sizeof(sockaddr_in);
306
#ifdef ZMQ_HAVE_WINDOWS
307
    int nbytes = recvfrom(fd, (char*) in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen);
308 309 310 311 312 313 314 315 316
    const int last_error = WSAGetLastError();
    if (nbytes == SOCKET_ERROR) {
        wsa_assert(
            last_error == WSAENETDOWN ||
            last_error == WSAENETRESET ||
            last_error == WSAEWOULDBLOCK);
        return;
    }
#else
317
    int nbytes = recvfrom(fd, in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen);
318 319 320 321 322 323 324 325
    if (nbytes == -1) {
        errno_assert(errno != EBADF
            && errno != EFAULT
            && errno != ENOMEM
            && errno != ENOTSOCK);
        return;
    }
#endif
326
    int rc;
327
    int body_size;
328
    int body_offset;
329
    msg_t msg;
330

331
    if (options.raw_socket) {
332 333 334 335
        sockaddr_to_msg (&msg, &in_address);

        body_size = nbytes;
        body_offset = 0;
336 337
    }
    else {
338 339 340 341 342 343 344
        char* group_buffer = (char *)in_buffer + 1;
        int group_size = in_buffer[0];

        rc = msg.init_size (group_size);
        errno_assert (rc == 0);
        msg.set_flags (msg_t::more);
        memcpy (msg.data (), group_buffer, group_size);
345

346 347 348
        //  This doesn't fit, just ingore
        if (nbytes - 1 < group_size)
            return;
349

350
        body_size = nbytes - 1 - group_size;
351
        body_offset = 1 + group_size;
352
    }
353

354 355
    rc = session->push_msg (&msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
356

357 358
    //  Pipe is full
    if (rc != 0) {
359 360
        rc = msg.close ();
        errno_assert (rc == 0);
361 362 363

        reset_pollin (handle);
        return;
364
    }
365 366 367 368 369

    rc = msg.close ();
    errno_assert (rc == 0);
    rc = msg.init_size (body_size);
    errno_assert (rc == 0);
370
    memcpy (msg.data (), in_buffer + body_offset, body_size);
371 372 373 374 375
    rc = session->push_msg (&msg);
    errno_assert (rc == 0);
    rc = msg.close ();
    errno_assert (rc == 0);
    session->flush ();
376 377 378 379 380 381 382 383 384 385
}

void zmq::udp_engine_t::restart_input()
{
    if (!recv_enabled)
        return;

    set_pollin (handle);
    in_event ();
}