pgm_socket.cpp 22.9 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#ifdef ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33 34

#ifdef ZMQ_HAVE_LINUX
35
#include <poll.h>
malosek's avatar
malosek committed
36 37
#endif

Martin Sustrik's avatar
Martin Sustrik committed
38
#include <stdlib.h>
39
#include <string.h>
malosek's avatar
malosek committed
40 41 42 43 44 45
#include <string>

#include "options.hpp"
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
46
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
47
#include "stdint.hpp"
malosek's avatar
malosek committed
48

49
#ifndef MSG_ERRQUEUE
Martin Sustrik's avatar
Martin Sustrik committed
50
#define MSG_ERRQUEUE 0x2000
51 52
#endif

malosek's avatar
malosek committed
53
zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
Steven McCoy's avatar
Steven McCoy committed
54
    sock (NULL),
malosek's avatar
malosek committed
55 56 57
    options (options_),
    receiver (receiver_),
    pgm_msgv (NULL),
58
    pgm_msgv_len (0),
malosek's avatar
malosek committed
59 60
    nbytes_rec (0),
    nbytes_processed (0),
61
    pgm_msgv_processed (0)
malosek's avatar
malosek committed
62 63 64
{
}

65
//  Resolve PGM socket address.
66 67 68 69
//  network_ of the form <interface & multicast group decls>:<IP port>
//  e.g. eth0;239.192.0.1:7500
//       link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
//       ;[fe80::1%en0]:7500
70
int zmq::pgm_socket_t::init_address (const char *network_,
71 72
                                     struct pgm_addrinfo_t **res,
                                     uint16_t *port_number)
malosek's avatar
malosek committed
73
{
Steven McCoy's avatar
Steven McCoy committed
74 75
    //  Parse port number, start from end for IPv6
    const char *port_delim = strrchr (network_, ':');
malosek's avatar
malosek committed
76 77 78 79 80
    if (!port_delim) {
        errno = EINVAL;
        return -1;
    }

81
    *port_number = atoi (port_delim + 1);
82

83
    char network[256];
84
    if (port_delim - network_ >= (int) sizeof (network) - 1) {
malosek's avatar
malosek committed
85 86 87 88
        errno = EINVAL;
        return -1;
    }
    memset (network, '\0', sizeof (network));
89
    memcpy (network, network_, port_delim - network_);
malosek's avatar
malosek committed
90

Steven McCoy's avatar
Steven McCoy committed
91
    pgm_error_t *pgm_error = NULL;
Ian Barber's avatar
Ian Barber committed
92
    struct pgm_addrinfo_t hints;
Steven McCoy's avatar
Steven McCoy committed
93 94 95

    memset (&hints, 0, sizeof (hints));
    hints.ai_family = AF_UNSPEC;
Ian Barber's avatar
Ian Barber committed
96
    if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
97
        //  Invalid parameters don't set pgm_error_t.
98
        zmq_assert (pgm_error != NULL);
99
        if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
100

101 102 103
            //  NB: cannot catch EAI_BADFLAGS.
            (pgm_error->code != PGM_ERROR_SERVICE
             && pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
104
            //  User, host, or network configuration or transient error.
105 106 107 108
            pgm_error_free (pgm_error);
            errno = EINVAL;
            return -1;
        }
Steven McCoy's avatar
Steven McCoy committed
109

110
        //  Fatal OpenPGM internal error.
111
        zmq_assert (false);
malosek's avatar
malosek committed
112
    }
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    return 0;
}

//  Create, bind and connect PGM socket.
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
    //  Can not open transport before destroying old one.
    zmq_assert (sock == NULL);
    zmq_assert (options.rate > 0);

    //  Zero counter used in msgrecv.
    nbytes_rec = 0;
    nbytes_processed = 0;
    pgm_msgv_processed = 0;

    uint16_t port_number;
    struct pgm_addrinfo_t *res = NULL;
    sa_family_t sa_family;

    pgm_error_t *pgm_error = NULL;

134
    if (init_address (network_, &res, &port_number) < 0) {
135 136
        goto err_abort;
    }
malosek's avatar
malosek committed
137

138 139
    zmq_assert (res != NULL);

140
    //  Pick up detected IP family.
Steven McCoy's avatar
Steven McCoy committed
141
    sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
malosek's avatar
malosek committed
142

143
    //  Create IP/PGM or UDP/PGM socket.
Martin Sustrik's avatar
Martin Sustrik committed
144
    if (udp_encapsulation_) {
145
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
146
                         &pgm_error)) {
147
            //  Invalid parameters don't set pgm_error_t.
148
            zmq_assert (pgm_error != NULL);
149 150 151 152 153
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
                && (pgm_error->code != PGM_ERROR_BADF
                    && pgm_error->code != PGM_ERROR_FAULT
                    && pgm_error->code != PGM_ERROR_NOPROTOOPT
                    && pgm_error->code != PGM_ERROR_FAILED))
154 155

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
156 157
                goto err_abort;

158
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
159
            zmq_assert (false);
160 161
        }

Steven McCoy's avatar
Steven McCoy committed
162 163
        //  All options are of data type int
        const int encapsulation_port = port_number;
164
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
165
                             &encapsulation_port, sizeof (encapsulation_port)))
166 167
            goto err_abort;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
168
                             &encapsulation_port, sizeof (encapsulation_port)))
Steven McCoy's avatar
Steven McCoy committed
169
            goto err_abort;
170
    } else {
171
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
172
                         &pgm_error)) {
173
            //  Invalid parameters don't set pgm_error_t.
174
            zmq_assert (pgm_error != NULL);
175 176 177 178 179
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
                && (pgm_error->code != PGM_ERROR_BADF
                    && pgm_error->code != PGM_ERROR_FAULT
                    && pgm_error->code != PGM_ERROR_NOPROTOOPT
                    && pgm_error->code != PGM_ERROR_FAILED))
180 181

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
182 183
                goto err_abort;

184
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
185 186
            zmq_assert (false);
        }
malosek's avatar
malosek committed
187 188
    }

Steven McCoy's avatar
Steven McCoy committed
189
    {
190 191 192
        const int rcvbuf = (int) options.rcvbuf;
        if (rcvbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
193
                                 sizeof (rcvbuf)))
194 195 196 197 198 199
                goto err_abort;
        }

        const int sndbuf = (int) options.sndbuf;
        if (sndbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
200
                                 sizeof (sndbuf)))
201 202 203
                goto err_abort;
        }

204
        const int max_tpdu = (int) options.multicast_maxtpdu;
205
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
206
                             sizeof (max_tpdu)))
207
            goto err_abort;
malosek's avatar
malosek committed
208 209
    }

malosek's avatar
malosek committed
210
    if (receiver) {
211 212 213 214 215 216
        const int recv_only = 1, rxw_max_tpdu = (int) options.multicast_maxtpdu,
                  rxw_sqns = compute_sqns (rxw_max_tpdu),
                  peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25),
                  nak_bo_ivl = pgm_msecs (50), nak_rpt_ivl = pgm_msecs (200),
                  nak_rdata_ivl = pgm_msecs (200), nak_data_retries = 50,
                  nak_ncf_retries = 50;
Steven McCoy's avatar
Steven McCoy committed
217

218
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
                             sizeof (recv_only))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
                                sizeof (rxw_sqns))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY,
                                &peer_expiry, sizeof (peer_expiry))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY,
                                &spmr_expiry, sizeof (spmr_expiry))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
                                sizeof (nak_bo_ivl))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL,
                                &nak_rpt_ivl, sizeof (nak_rpt_ivl))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
                                &nak_rdata_ivl, sizeof (nak_rdata_ivl))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
                                &nak_data_retries, sizeof (nak_data_retries))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
                                &nak_ncf_retries, sizeof (nak_ncf_retries)))
Steven McCoy's avatar
Steven McCoy committed
236
            goto err_abort;
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
    } else {
        const int send_only = 1, max_rte = (int) ((options.rate * 1000) / 8),
                  txw_max_tpdu = (int) options.multicast_maxtpdu,
                  txw_sqns = compute_sqns (txw_max_tpdu),
                  ambient_spm = pgm_secs (30),
                  heartbeat_spm[] = {
                    pgm_msecs (100), pgm_msecs (100),  pgm_msecs (100),
                    pgm_msecs (100), pgm_msecs (1300), pgm_secs (7),
                    pgm_secs (16),   pgm_secs (25),    pgm_secs (30)};

        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only,
                             sizeof (send_only))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, &max_rte,
                                sizeof (max_rte))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, &txw_sqns,
                                sizeof (txw_sqns))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                                &ambient_spm, sizeof (ambient_spm))
            || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
                                &heartbeat_spm, sizeof (heartbeat_spm)))
Steven McCoy's avatar
Steven McCoy committed
257 258
            goto err_abort;
    }
malosek's avatar
malosek committed
259

Steven McCoy's avatar
Steven McCoy committed
260 261
    //  PGM transport GSI.
    struct pgm_sockaddr_t addr;
Martin Sustrik's avatar
Martin Sustrik committed
262

263
    memset (&addr, 0, sizeof (addr));
Steven McCoy's avatar
Steven McCoy committed
264 265
    addr.sa_port = port_number;
    addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
malosek's avatar
malosek committed
266

267
    //  Create random GSI.
268 269 270 271
    uint32_t buf[2];
    buf[0] = generate_random ();
    buf[1] = generate_random ();
    if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t *) buf, 8))
272
        goto err_abort;
273

malosek's avatar
malosek committed
274

Steven McCoy's avatar
Steven McCoy committed
275 276
    //  Bind a transport to the specified network devices.
    struct pgm_interface_req_t if_req;
277
    memset (&if_req, 0, sizeof (if_req));
Steven McCoy's avatar
Steven McCoy committed
278
    if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
279
    if_req.ir_scope_id = 0;
Steven McCoy's avatar
Steven McCoy committed
280 281 282 283
    if (AF_INET6 == sa_family) {
        struct sockaddr_in6 sa6;
        memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
        if_req.ir_scope_id = sa6.sin6_scope_id;
malosek's avatar
malosek committed
284
    }
285
    if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
286
                    &if_req, sizeof (if_req), &pgm_error)) {
287
        //  Invalid parameters don't set pgm_error_t.
288
        zmq_assert (pgm_error != NULL);
289 290 291 292 293
        if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
             || pgm_error->domain == PGM_ERROR_DOMAIN_IF)
            && (pgm_error->code != PGM_ERROR_INVAL
                && pgm_error->code != PGM_ERROR_BADF
                && pgm_error->code != PGM_ERROR_FAULT))
294 295

            //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
296 297
            goto err_abort;

298
        //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
299
        zmq_assert (false);
malosek's avatar
malosek committed
300 301
    }

302 303 304
    //  Join IP multicast groups.
    for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
305
                             &res->ai_recv_addrs[i], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
306 307
            goto err_abort;
    }
308
    if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
309
                         &res->ai_send_addrs[0], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
310
        goto err_abort;
311

Steven McCoy's avatar
Steven McCoy committed
312
    pgm_freeaddrinfo (res);
313
    res = NULL;
314

315
    //  Set IP level parameters.
Steven McCoy's avatar
Steven McCoy committed
316
    {
317 318 319
        // Multicast loopback disabled by default
        const int multicast_loop = 0;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
320
                             &multicast_loop, sizeof (multicast_loop)))
321 322 323 324
            goto err_abort;

        const int multicast_hops = options.multicast_hops;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
325
                             &multicast_hops, sizeof (multicast_hops)))
326 327 328 329 330 331
            goto err_abort;

        //  Expedited Forwarding PHB for network elements, no ECN.
        //  Ignore return value due to varied runtime support.
        const int dscp = 0x2e << 2;
        if (AF_INET6 != sa_family)
332
            pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp));
333 334

        const int nonblocking = 1;
335 336
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking,
                             sizeof (nonblocking)))
337
            goto err_abort;
338
    }
malosek's avatar
malosek committed
339

340 341
    //  Connect PGM transport to start state machine.
    if (!pgm_connect (sock, &pgm_error)) {
342
        //  Invalid parameters don't set pgm_error_t.
343 344 345 346
        zmq_assert (pgm_error != NULL);
        goto err_abort;
    }

Martin Sustrik's avatar
Martin Sustrik committed
347 348 349 350 351 352 353 354 355
    //  For receiver transport preallocate pgm_msgv array.
    if (receiver) {
        zmq_assert (in_batch_size > 0);
        size_t max_tsdu_size = get_max_tsdu_size ();
        pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
        if ((int) in_batch_size % max_tsdu_size)
            pgm_msgv_len++;
        zmq_assert (pgm_msgv_len);

356
        pgm_msgv = (pgm_msgv_t *) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
357
        alloc_assert (pgm_msgv);
Martin Sustrik's avatar
Martin Sustrik committed
358 359
    }

malosek's avatar
malosek committed
360
    return 0;
Steven McCoy's avatar
Steven McCoy committed
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376

err_abort:
    if (sock != NULL) {
        pgm_close (sock, FALSE);
        sock = NULL;
    }
    if (res != NULL) {
        pgm_freeaddrinfo (res);
        res = NULL;
    }
    if (pgm_error != NULL) {
        pgm_error_free (pgm_error);
        pgm_error = NULL;
    }
    errno = EINVAL;
    return -1;
malosek's avatar
malosek committed
377 378 379 380
}

zmq::pgm_socket_t::~pgm_socket_t ()
{
Martin Sustrik's avatar
Martin Sustrik committed
381 382
    if (pgm_msgv)
        free (pgm_msgv);
383
    if (sock)
Steven McCoy's avatar
Steven McCoy committed
384
        pgm_close (sock, TRUE);
malosek's avatar
malosek committed
385 386
}

387 388
//  Get receiver fds. receive_fd_ is signaled for incoming packets,
//  waiting_pipe_fd_ is signaled for state driven events and data.
389
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
390
                                          fd_t *waiting_pipe_fd_)
malosek's avatar
malosek committed
391
{
Steven McCoy's avatar
Steven McCoy committed
392 393 394
    socklen_t socklen;
    bool rc;

395 396 397
    zmq_assert (receive_fd_);
    zmq_assert (waiting_pipe_fd_);

Steven McCoy's avatar
Steven McCoy committed
398
    socklen = sizeof (*receive_fd_);
399 400
    rc =
      pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
Steven McCoy's avatar
Steven McCoy committed
401 402
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));
403

Steven McCoy's avatar
Steven McCoy committed
404
    socklen = sizeof (*waiting_pipe_fd_);
405
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
406
                         &socklen);
Steven McCoy's avatar
Steven McCoy committed
407 408
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
malosek's avatar
malosek committed
409 410
}

411
//  Get fds and store them into user allocated memory.
Steven McCoy's avatar
Steven McCoy committed
412 413 414 415
//  send_fd is for non-blocking send wire notifications.
//  receive_fd_ is for incoming back-channel protocol packets.
//  rdata_notify_fd_ is raised for waiting repair transmissions.
//  pending_notify_fd_ is for state driven events.
416 417 418 419
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_,
                                        fd_t *receive_fd_,
                                        fd_t *rdata_notify_fd_,
                                        fd_t *pending_notify_fd_)
malosek's avatar
malosek committed
420
{
Steven McCoy's avatar
Steven McCoy committed
421 422 423
    socklen_t socklen;
    bool rc;

malosek's avatar
malosek committed
424 425
    zmq_assert (send_fd_);
    zmq_assert (receive_fd_);
426
    zmq_assert (rdata_notify_fd_);
427
    zmq_assert (pending_notify_fd_);
428

Steven McCoy's avatar
Steven McCoy committed
429 430 431 432 433 434
    socklen = sizeof (*send_fd_);
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*receive_fd_);
435 436
    rc =
      pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
Steven McCoy's avatar
Steven McCoy committed
437 438 439 440
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*rdata_notify_fd_);
441
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
442
                         &socklen);
Steven McCoy's avatar
Steven McCoy committed
443 444 445 446
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*rdata_notify_fd_));

    socklen = sizeof (*pending_notify_fd_);
447
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
448
                         pending_notify_fd_, &socklen);
Steven McCoy's avatar
Steven McCoy committed
449 450
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*pending_notify_fd_));
malosek's avatar
malosek committed
451 452 453
}

//  Send one APDU, transmit window owned memory.
Steven McCoy's avatar
Steven McCoy committed
454
//  data_len_ must be less than one TPDU.
malosek's avatar
malosek committed
455 456
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
malosek's avatar
malosek committed
457
    size_t nbytes = 0;
458

Steven McCoy's avatar
Steven McCoy committed
459
    const int status = pgm_send (sock, data_, data_len_, &nbytes);
malosek's avatar
malosek committed
460

461
    //  We have to write all data as one packet.
462 463
    if (nbytes > 0) {
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
Martin Hurton's avatar
Martin Hurton committed
464
        zmq_assert (nbytes == data_len_);
465 466 467
    } else {
        zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED
                    || status == PGM_IO_STATUS_WOULD_BLOCK);
468 469 470 471 472 473 474

        if (status == PGM_IO_STATUS_RATE_LIMITED)
            errno = ENOMEM;
        else
            errno = EBUSY;
    }

475
    //  Save return value.
476
    last_tx_status = status;
malosek's avatar
malosek committed
477 478 479 480

    return nbytes;
}

481 482
long zmq::pgm_socket_t::get_rx_timeout ()
{
483 484
    if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED
        && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
485 486 487 488
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
489
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
490 491 492 493
                                    last_rx_status == PGM_IO_STATUS_RATE_LIMITED
                                      ? PGM_RATE_REMAIN
                                      : PGM_TIME_REMAIN,
                                    &tv, &optlen);
494 495 496 497 498 499 500 501 502 503 504 505 506 507
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

long zmq::pgm_socket_t::get_tx_timeout ()
{
    if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
508 509
    const bool rc =
      pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
510 511 512 513 514 515 516
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

malosek's avatar
malosek committed
517
//  Return max TSDU size without fragmentation from current PGM transport.
518
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
malosek's avatar
malosek committed
519
{
Steven McCoy's avatar
Steven McCoy committed
520 521 522 523 524 525 526
    int max_tsdu = 0;
    socklen_t optlen = sizeof (max_tsdu);

    bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
    zmq_assert (rc);
    zmq_assert (optlen == sizeof (max_tsdu));
    return (size_t) max_tsdu;
malosek's avatar
malosek committed
527 528
}

529 530
//  pgm_recvmsgv is called to fill the pgm_msgv array up to  pgm_msgv_len.
//  In subsequent calls data from pgm_msgv structure are returned.
malosek's avatar
malosek committed
531
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
malosek's avatar
malosek committed
532
{
malosek's avatar
malosek committed
533 534
    size_t raw_data_len = 0;

535
    //  We just sent all data from pgm_transport_recvmsgv up
malosek's avatar
malosek committed
536 537 538 539 540 541
    //  and have to return 0 that another engine in this thread is scheduled.
    if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
        //  Reset all the counters.
        nbytes_rec = 0;
        nbytes_processed = 0;
        pgm_msgv_processed = 0;
542
        errno = EAGAIN;
543
        return 0;
malosek's avatar
malosek committed
544 545 546
    }

    //  If we have are going first time or if we have processed all pgm_msgv_t
547
    //  structure previously read from the pgm socket.
malosek's avatar
malosek committed
548 549 550 551 552 553
    if (nbytes_rec == nbytes_processed) {
        //  Check program flow.
        zmq_assert (pgm_msgv_processed == 0);
        zmq_assert (nbytes_processed == 0);
        zmq_assert (nbytes_rec == 0);

554
        //  Receive a vector of Application Protocol Domain Unit's (APDUs)
malosek's avatar
malosek committed
555
        //  from the transport.
Steven McCoy's avatar
Steven McCoy committed
556
        pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
557

558 559
        const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len,
                                         MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
560

561
        //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
562
        zmq_assert (status != PGM_IO_STATUS_ERROR);
563

564 565
        last_rx_status = status;

malosek's avatar
malosek committed
566
        //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
Steven McCoy's avatar
Steven McCoy committed
567
        //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
568
        if (status == PGM_IO_STATUS_TIMER_PENDING) {
569 570
            zmq_assert (nbytes_rec == 0);

571
            //  In case if no RDATA/ODATA caused POLLIN 0 is
malosek's avatar
malosek committed
572 573
            //  returned.
            nbytes_rec = 0;
574
            errno = EBUSY;
575
            return 0;
malosek's avatar
malosek committed
576
        }
577

Steven McCoy's avatar
Steven McCoy committed
578 579 580 581
        //  Send SPMR, NAK, ACK is rate limited.
        if (status == PGM_IO_STATUS_RATE_LIMITED) {
            zmq_assert (nbytes_rec == 0);

582
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
583
            nbytes_rec = 0;
584 585
            errno = ENOMEM;
            return 0;
Steven McCoy's avatar
Steven McCoy committed
586 587 588 589 590 591
        }

        //  No peers and hence no incoming packets.
        if (status == PGM_IO_STATUS_WOULD_BLOCK) {
            zmq_assert (nbytes_rec == 0);

592
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
593
            nbytes_rec = 0;
594
            errno = EAGAIN;
595
            return 0;
Steven McCoy's avatar
Steven McCoy committed
596 597
        }

598 599
        //  Data loss.
        if (status == PGM_IO_STATUS_RESET) {
600
            struct pgm_sk_buff_t *skb = pgm_msgv[0].msgv_skb[0];
601 602

            //  Save lost data TSI.
Steven McCoy's avatar
Steven McCoy committed
603
            *tsi_ = &skb->tsi;
604 605 606
            nbytes_rec = 0;

            //  In case of dala loss -1 is returned.
607
            errno = EINVAL;
Steven McCoy's avatar
Steven McCoy committed
608
            pgm_free_skb (skb);
609 610 611
            return -1;
        }

612
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
613
    } else {
614 615
        zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
    }
malosek's avatar
malosek committed
616

617
    // Zero byte payloads are valid in PGM, but not 0MQ protocol.
malosek's avatar
malosek committed
618 619
    zmq_assert (nbytes_rec > 0);

malosek's avatar
malosek committed
620
    // Only one APDU per pgm_msgv_t structure is allowed.
621
    zmq_assert (pgm_msgv[pgm_msgv_processed].msgv_len == 1);
622

623
    struct pgm_sk_buff_t *skb = pgm_msgv[pgm_msgv_processed].msgv_skb[0];
malosek's avatar
malosek committed
624 625 626 627 628 629 630

    //  Take pointers from pgm_msgv_t structure.
    *raw_data_ = skb->data;
    raw_data_len = skb->len;

    //  Save current TSI.
    *tsi_ = &skb->tsi;
malosek's avatar
malosek committed
631 632 633

    //  Move the the next pgm_msgv_t structure.
    pgm_msgv_processed++;
634
    zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
635
    nbytes_processed += raw_data_len;
malosek's avatar
malosek committed
636 637 638 639

    return raw_data_len;
}

640
void zmq::pgm_socket_t::process_upstream ()
malosek's avatar
malosek committed
641
{
malosek's avatar
malosek committed
642
    pgm_msgv_t dummy_msg;
malosek's avatar
malosek committed
643

malosek's avatar
malosek committed
644
    size_t dummy_bytes = 0;
Steven McCoy's avatar
Steven McCoy committed
645
    pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
646

647 648
    const int status = pgm_recvmsgv (sock, &dummy_msg, 1, MSG_ERRQUEUE,
                                     &dummy_bytes, &pgm_error);
malosek's avatar
malosek committed
649

650
    //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
651
    zmq_assert (status != PGM_IO_STATUS_ERROR);
652

malosek's avatar
malosek committed
653
    //  No data should be returned.
654 655 656 657
    zmq_assert (dummy_bytes == 0
                && (status == PGM_IO_STATUS_TIMER_PENDING
                    || status == PGM_IO_STATUS_RATE_LIMITED
                    || status == PGM_IO_STATUS_WOULD_BLOCK));
658 659 660 661 662

    last_rx_status = status;

    if (status == PGM_IO_STATUS_TIMER_PENDING)
        errno = EBUSY;
663
    else if (status == PGM_IO_STATUS_RATE_LIMITED)
664 665 666
        errno = ENOMEM;
    else
        errno = EAGAIN;
malosek's avatar
malosek committed
667 668
}

669 670 671
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
    //  Convert rate into B/ms.
672
    uint64_t rate = uint64_t (options.rate) / 8;
673

674
    //  Compute the size of the buffer in bytes.
675
    uint64_t size = uint64_t (options.recovery_ivl) * rate;
676 677 678 679

    //  Translate the size into number of packets.
    uint64_t sqns = size / tpdu_;

680
    //  Buffer should be able to hold at least one packet.
681 682 683
    if (sqns == 0)
        sqns = 1;

684
    return (int) sqns;
685 686
}

malosek's avatar
malosek committed
687
#endif