udp_engine.cpp 5.84 KB
Newer Older
1 2 3 4 5 6
#include "platform.hpp"

#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <sys/types.h>
7
#include <unistd.h>
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#endif

#include "udp_engine.hpp"
#include "session_base.hpp"
#include "v2_protocol.hpp"
#include "err.hpp"
#include "ip.hpp"

zmq::udp_engine_t::udp_engine_t() :
    plugged (false),
    session(NULL)
{
}

zmq::udp_engine_t::~udp_engine_t()
{
    zmq_assert (!plugged);

    if (fd != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (fd);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = close (fd);
        errno_assert (rc == 0);
#endif
        fd = retired_fd;
    }
}

41
int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
{
    zmq_assert (address_);
    zmq_assert (send_ || recv_);
    send_enabled = send_;
    recv_enabled = recv_;
    address = address_;

    fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP);
    if (fd == retired_fd)
        return -1;

    unblock_socket (fd);

    return 0;
}

void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
{
    zmq_assert (!plugged);
    plugged = true;

    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (fd);

    if (send_enabled)
        set_pollout (handle);

    if (recv_enabled) {
        int on = 1;
        int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on));
77 78 79 80 81 82 83 84 85 86 87 88 89
#ifdef ZMQ_HAVE_WINDOWS
        wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif

        rc = bind (fd, address->resolved.udp_addr->bind_addr (),
                       address->resolved.udp_addr->bind_addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
        wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif
90 91 92 93 94

        if (address->resolved.udp_addr->is_mcast ()) {
            struct ip_mreq mreq;
            mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
            mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
95
            int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
#ifdef ZMQ_HAVE_WINDOWS
            wsa_assert (rc != SOCKET_ERROR);
#else
            errno_assert (rc == 0);
#endif
        }
        set_pollin (handle);

        //  Call restart output to drop all join/leave commands
        restart_output ();
    }
}

void zmq::udp_engine_t::terminate()
{
    zmq_assert (plugged);
    plugged = false;

    rm_fd (handle);

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    delete this;
}

void zmq::udp_engine_t::out_event()
{
    msg_t group_msg;
    int rc = session->pull_msg (&group_msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));

    if (rc == 0) {
        msg_t body_msg;
        rc = session->pull_msg (&body_msg);

        size_t group_size = group_msg.size ();
        size_t body_size = body_msg.size ();
        size_t size = group_size + body_size + 1;

        // TODO: check if larger than maximum size
        out_buffer[0] = (unsigned char) group_size;
        memcpy (out_buffer + 1, group_msg.data (), group_size);
        memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);

        rc = group_msg.close ();
        errno_assert (rc == 0);

        body_msg.close ();
        errno_assert (rc == 0);

147
#ifdef ZMQ_HAVE_WINDOWS
148 149 150 151
        rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
            address->resolved.udp_addr->dest_addr (),
            (int) address->resolved.udp_addr->dest_addrlen ());
        wsa_assert (rc != SOCKET_ERROR);
152
#else
153 154 155 156
        rc = sendto (fd, out_buffer, size, 0,
            address->resolved.udp_addr->dest_addr (),
            address->resolved.udp_addr->dest_addrlen ());
        errno_assert (rc != -1);
157
#endif
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    }
    else
       reset_pollout (handle);
}

void zmq::udp_engine_t::restart_output()
{
    //  If we don't support send we just drop all messages
    if (!send_enabled) {
        msg_t msg;
        while (session->pull_msg (&msg) == 0)
            msg.close ();
    }
    else {
        set_pollout(handle);
        out_event ();
    }
}

void zmq::udp_engine_t::in_event()
{
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
#ifdef ZMQ_HAVE_WINDOWS
    int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
    const int last_error = WSAGetLastError();
    if (nbytes == SOCKET_ERROR) {
        wsa_assert(
            last_error == WSAENETDOWN ||
            last_error == WSAENETRESET ||
            last_error == WSAEWOULDBLOCK);
        return;
    }
#else
    int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
    if (nbytes == -1) {
        errno_assert(errno != EBADF
            && errno != EFAULT
            && errno != ENOMEM
            && errno != ENOTSOCK);
        return;
    }
#endif
199

200
    int group_size = in_buffer[0];
201

202 203 204
    //  This doesn't fit, just ingore
    if (nbytes - 1 < group_size)
        return;
205

206
    int body_size = nbytes - 1 - group_size;
207

208 209 210 211 212
    msg_t msg;
    int rc = msg.init_size (group_size);
    errno_assert (rc == 0);
    msg.set_flags (msg_t::more);
    memcpy (msg.data (), in_buffer + 1, group_size);
213

214 215
    rc = session->push_msg (&msg);
    errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
216

217 218
    //  Pipe is full
    if (rc != 0) {
219 220
        rc = msg.close ();
        errno_assert (rc == 0);
221 222 223

        reset_pollin (handle);
        return;
224
    }
225 226 227 228 229 230 231 232 233 234 235

    rc = msg.close ();
    errno_assert (rc == 0);
    rc = msg.init_size (body_size);
    errno_assert (rc == 0);
    memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
    rc = session->push_msg (&msg);
    errno_assert (rc == 0);
    rc = msg.close ();
    errno_assert (rc == 0);
    session->flush ();
236 237 238 239 240 241 242 243 244 245
}

void zmq::udp_engine_t::restart_input()
{
    if (!recv_enabled)
        return;

    set_pollin (handle);
    in_event ();
}