pgm_socket.cpp 22.5 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#ifdef ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33 34

#ifdef ZMQ_HAVE_LINUX
35
#include <poll.h>
malosek's avatar
malosek committed
36 37
#endif

Martin Sustrik's avatar
Martin Sustrik committed
38
#include <stdlib.h>
39
#include <string.h>
malosek's avatar
malosek committed
40 41 42 43 44 45
#include <string>

#include "options.hpp"
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
46
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
47
#include "stdint.hpp"
malosek's avatar
malosek committed
48

49
#ifndef MSG_ERRQUEUE
Martin Sustrik's avatar
Martin Sustrik committed
50
#define MSG_ERRQUEUE 0x2000
51 52
#endif

malosek's avatar
malosek committed
53
zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
Steven McCoy's avatar
Steven McCoy committed
54
    sock (NULL),
malosek's avatar
malosek committed
55 56 57
    options (options_),
    receiver (receiver_),
    pgm_msgv (NULL),
58
    pgm_msgv_len (0),
malosek's avatar
malosek committed
59 60
    nbytes_rec (0),
    nbytes_processed (0),
61
    pgm_msgv_processed (0)
malosek's avatar
malosek committed
62 63 64
{
}

65
//  Resolve PGM socket address.
66 67 68 69
//  network_ of the form <interface & multicast group decls>:<IP port>
//  e.g. eth0;239.192.0.1:7500
//       link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
//       ;[fe80::1%en0]:7500
70
int zmq::pgm_socket_t::init_address (const char *network_,
Ian Barber's avatar
Ian Barber committed
71
    struct pgm_addrinfo_t **res, uint16_t *port_number)
malosek's avatar
malosek committed
72
{
Steven McCoy's avatar
Steven McCoy committed
73 74
    //  Parse port number, start from end for IPv6
    const char *port_delim = strrchr (network_, ':');
malosek's avatar
malosek committed
75 76 77 78 79
    if (!port_delim) {
        errno = EINVAL;
        return -1;
    }

80
    *port_number = atoi (port_delim + 1);
81

Martin Sustrik's avatar
Martin Sustrik committed
82
    char network [256];
83
    if (port_delim - network_ >= (int) sizeof (network) - 1) {
malosek's avatar
malosek committed
84 85 86 87
        errno = EINVAL;
        return -1;
    }
    memset (network, '\0', sizeof (network));
88
    memcpy (network, network_, port_delim - network_);
malosek's avatar
malosek committed
89

Steven McCoy's avatar
Steven McCoy committed
90
    pgm_error_t *pgm_error = NULL;
Ian Barber's avatar
Ian Barber committed
91
    struct pgm_addrinfo_t hints;
Steven McCoy's avatar
Steven McCoy committed
92 93 94

    memset (&hints, 0, sizeof (hints));
    hints.ai_family = AF_UNSPEC;
Ian Barber's avatar
Ian Barber committed
95
    if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
96 97

        //  Invalid parameters don't set pgm_error_t.
98
        zmq_assert (pgm_error != NULL);
99
        if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
100 101

              //  NB: cannot catch EAI_BADFLAGS.
102 103
            ( pgm_error->code != PGM_ERROR_SERVICE &&
              pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
104 105

            //  User, host, or network configuration or transient error.
106 107 108 109
            pgm_error_free (pgm_error);
            errno = EINVAL;
            return -1;
        }
Steven McCoy's avatar
Steven McCoy committed
110

111
        //  Fatal OpenPGM internal error.
112
        zmq_assert (false);
malosek's avatar
malosek committed
113
    }
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    return 0;
}

//  Create, bind and connect PGM socket.
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
    //  Can not open transport before destroying old one.
    zmq_assert (sock == NULL);
    zmq_assert (options.rate > 0);

    //  Zero counter used in msgrecv.
    nbytes_rec = 0;
    nbytes_processed = 0;
    pgm_msgv_processed = 0;

    uint16_t port_number;
    struct pgm_addrinfo_t *res = NULL;
    sa_family_t sa_family;

    pgm_error_t *pgm_error = NULL;

    if (init_address(network_, &res, &port_number) < 0) {
        goto err_abort;
    }
malosek's avatar
malosek committed
138

139 140
    zmq_assert (res != NULL);

141
    //  Pick up detected IP family.
Steven McCoy's avatar
Steven McCoy committed
142
    sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
malosek's avatar
malosek committed
143

144
    //  Create IP/PGM or UDP/PGM socket.
Martin Sustrik's avatar
Martin Sustrik committed
145
    if (udp_encapsulation_) {
146 147 148 149
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
              &pgm_error)) {

            //  Invalid parameters don't set pgm_error_t.
150
            zmq_assert (pgm_error != NULL);
Steven McCoy's avatar
Steven McCoy committed
151
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
152 153 154 155
                  pgm_error->code != PGM_ERROR_BADF &&
                  pgm_error->code != PGM_ERROR_FAULT &&
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
                  pgm_error->code != PGM_ERROR_FAILED))
156 157

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
158 159
                goto err_abort;

160
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
161
            zmq_assert (false);
162 163
        }

Steven McCoy's avatar
Steven McCoy committed
164 165
        //  All options are of data type int
        const int encapsulation_port = port_number;
166
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
167 168 169
                &encapsulation_port, sizeof (encapsulation_port)))
            goto err_abort;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
170
                &encapsulation_port, sizeof (encapsulation_port)))
Steven McCoy's avatar
Steven McCoy committed
171
            goto err_abort;
172 173 174 175 176 177
    }
    else {
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
              &pgm_error)) {

            //  Invalid parameters don't set pgm_error_t.
178
            zmq_assert (pgm_error != NULL);
Steven McCoy's avatar
Steven McCoy committed
179
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
180 181 182 183
                  pgm_error->code != PGM_ERROR_BADF &&
                  pgm_error->code != PGM_ERROR_FAULT &&
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
                  pgm_error->code != PGM_ERROR_FAILED))
184 185

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
186 187
                goto err_abort;

188
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
189 190
            zmq_assert (false);
        }
malosek's avatar
malosek committed
191 192
    }

Steven McCoy's avatar
Steven McCoy committed
193
    {
194 195 196 197 198 199 200 201 202 203 204 205 206 207
        const int rcvbuf = (int) options.rcvbuf;
        if (rcvbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
                  sizeof (rcvbuf)))
                goto err_abort;
        }

        const int sndbuf = (int) options.sndbuf;
        if (sndbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
                  sizeof (sndbuf)))
                goto err_abort;
        }

208
        const int max_tpdu = (int) options.multicast_maxtpdu;
209 210 211
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
              sizeof (max_tpdu)))
            goto err_abort;
malosek's avatar
malosek committed
212 213
    }

malosek's avatar
malosek committed
214
    if (receiver) {
Steven McCoy's avatar
Steven McCoy committed
215
        const int recv_only        = 1,
216
                  rxw_max_tpdu     = (int) options.multicast_maxtpdu,
217
                  rxw_sqns         = compute_sqns (rxw_max_tpdu),
218
                  peer_expiry      = pgm_secs (300),
Steven McCoy's avatar
Steven McCoy committed
219 220 221 222
                  spmr_expiry      = pgm_msecs (25),
                  nak_bo_ivl       = pgm_msecs (50),
                  nak_rpt_ivl      = pgm_msecs (200),
                  nak_rdata_ivl    = pgm_msecs (200),
223 224
                  nak_data_retries = 50,
                  nak_ncf_retries  = 50;
Steven McCoy's avatar
Steven McCoy committed
225

226 227
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
                sizeof (recv_only)) ||
228 229
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
                sizeof (rxw_sqns)) ||
230 231 232 233 234 235 236 237 238 239 240 241 242 243
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
                sizeof (peer_expiry)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
                sizeof (spmr_expiry)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
                sizeof (nak_bo_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
                sizeof (nak_rpt_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
                &nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
                &nak_data_retries, sizeof (nak_data_retries)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
                &nak_ncf_retries, sizeof (nak_ncf_retries)))
Steven McCoy's avatar
Steven McCoy committed
244
            goto err_abort;
245 246
    }
    else {
Steven McCoy's avatar
Steven McCoy committed
247
        const int send_only        = 1,
Steven McCoy's avatar
Steven McCoy committed
248
                  max_rte      = (int) ((options.rate * 1000) / 8),
249
                  txw_max_tpdu     = (int) options.multicast_maxtpdu,
250
                  txw_sqns         = compute_sqns (txw_max_tpdu),
251 252 253 254 255 256 257 258 259 260
                  ambient_spm      = pgm_secs (30),
                  heartbeat_spm[]  = { pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (1300),
                                       pgm_secs  (7),
                                       pgm_secs  (16),
                                       pgm_secs  (25),
                                       pgm_secs  (30) };
Steven McCoy's avatar
Steven McCoy committed
261

262 263
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
                &send_only, sizeof (send_only)) ||
Steven McCoy's avatar
Steven McCoy committed
264 265
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
                &max_rte, sizeof (max_rte)) ||
266 267
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
                &txw_sqns, sizeof (txw_sqns)) ||
268 269 270 271
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                &ambient_spm, sizeof (ambient_spm)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
                &heartbeat_spm, sizeof (heartbeat_spm)))
Steven McCoy's avatar
Steven McCoy committed
272 273
            goto err_abort;
    }
malosek's avatar
malosek committed
274

Steven McCoy's avatar
Steven McCoy committed
275 276
    //  PGM transport GSI.
    struct pgm_sockaddr_t addr;
Martin Sustrik's avatar
Martin Sustrik committed
277

Steven McCoy's avatar
Steven McCoy committed
278 279 280
    memset (&addr, 0, sizeof(addr));
    addr.sa_port = port_number;
    addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
malosek's avatar
malosek committed
281

282 283 284 285 286 287
    //  Create random GSI.
    uint32_t buf [2];
    buf [0] = generate_random ();
    buf [1] = generate_random ();
    if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
        goto err_abort;
288

malosek's avatar
malosek committed
289

Steven McCoy's avatar
Steven McCoy committed
290 291 292 293 294 295 296 297 298
    //  Bind a transport to the specified network devices.
    struct pgm_interface_req_t if_req;
    memset (&if_req, 0, sizeof(if_req));
    if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
    if_req.ir_scope_id  = 0;
    if (AF_INET6 == sa_family) {
        struct sockaddr_in6 sa6;
        memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
        if_req.ir_scope_id = sa6.sin6_scope_id;
malosek's avatar
malosek committed
299
    }
300 301 302 303
    if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
          &if_req, sizeof (if_req), &pgm_error)) {

        //  Invalid parameters don't set pgm_error_t.
304 305 306 307 308 309
        zmq_assert (pgm_error != NULL);
        if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
             pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
             pgm_error->code != PGM_ERROR_INVAL &&
             pgm_error->code != PGM_ERROR_BADF &&
             pgm_error->code != PGM_ERROR_FAULT))
310 311

            //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
312 313
            goto err_abort;

314
        //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
315
        zmq_assert (false);
malosek's avatar
malosek committed
316 317
    }

318 319 320 321
    //  Join IP multicast groups.
    for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
              &res->ai_recv_addrs [i], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
322 323
            goto err_abort;
    }
324 325
    if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
          &res->ai_send_addrs [0], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
326
        goto err_abort;
327

Steven McCoy's avatar
Steven McCoy committed
328
    pgm_freeaddrinfo (res);
329
    res = NULL;
330

331
    //  Set IP level parameters.
Steven McCoy's avatar
Steven McCoy committed
332
    {
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
        // Multicast loopback disabled by default
        const int multicast_loop = 0;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
              &multicast_loop, sizeof (multicast_loop)))
            goto err_abort;

        const int multicast_hops = options.multicast_hops;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
                &multicast_hops, sizeof (multicast_hops)))
            goto err_abort;

        //  Expedited Forwarding PHB for network elements, no ECN.
        //  Ignore return value due to varied runtime support.
        const int dscp = 0x2e << 2;
        if (AF_INET6 != sa_family)
            pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
               &dscp, sizeof (dscp));

        const int nonblocking = 1;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
              &nonblocking, sizeof (nonblocking)))
            goto err_abort;
355
    }
malosek's avatar
malosek committed
356

357 358
    //  Connect PGM transport to start state machine.
    if (!pgm_connect (sock, &pgm_error)) {
359 360

        //  Invalid parameters don't set pgm_error_t.
361 362 363 364
        zmq_assert (pgm_error != NULL);
        goto err_abort;
    }

Martin Sustrik's avatar
Martin Sustrik committed
365 366 367 368 369 370 371 372 373 374
    //  For receiver transport preallocate pgm_msgv array.
    if (receiver) {
        zmq_assert (in_batch_size > 0);
        size_t max_tsdu_size = get_max_tsdu_size ();
        pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
        if ((int) in_batch_size % max_tsdu_size)
            pgm_msgv_len++;
        zmq_assert (pgm_msgv_len);

        pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
375
        alloc_assert (pgm_msgv);
Martin Sustrik's avatar
Martin Sustrik committed
376 377
    }

malosek's avatar
malosek committed
378
    return 0;
Steven McCoy's avatar
Steven McCoy committed
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

err_abort:
    if (sock != NULL) {
        pgm_close (sock, FALSE);
        sock = NULL;
    }
    if (res != NULL) {
        pgm_freeaddrinfo (res);
        res = NULL;
    }
    if (pgm_error != NULL) {
        pgm_error_free (pgm_error);
        pgm_error = NULL;
    }
    errno = EINVAL;
    return -1;
malosek's avatar
malosek committed
395 396 397 398
}

zmq::pgm_socket_t::~pgm_socket_t ()
{
Martin Sustrik's avatar
Martin Sustrik committed
399 400
    if (pgm_msgv)
        free (pgm_msgv);
401
    if (sock)
Steven McCoy's avatar
Steven McCoy committed
402
        pgm_close (sock, TRUE);
malosek's avatar
malosek committed
403 404
}

405 406
//  Get receiver fds. receive_fd_ is signaled for incoming packets,
//  waiting_pipe_fd_ is signaled for state driven events and data.
407
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
408
    fd_t *waiting_pipe_fd_)
malosek's avatar
malosek committed
409
{
Steven McCoy's avatar
Steven McCoy committed
410 411 412
    socklen_t socklen;
    bool rc;

413 414 415
    zmq_assert (receive_fd_);
    zmq_assert (waiting_pipe_fd_);

Steven McCoy's avatar
Steven McCoy committed
416
    socklen = sizeof (*receive_fd_);
417 418
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
419 420
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));
421

Steven McCoy's avatar
Steven McCoy committed
422
    socklen = sizeof (*waiting_pipe_fd_);
423 424
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
425 426
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
malosek's avatar
malosek committed
427 428
}

429
//  Get fds and store them into user allocated memory.
Steven McCoy's avatar
Steven McCoy committed
430 431 432 433
//  send_fd is for non-blocking send wire notifications.
//  receive_fd_ is for incoming back-channel protocol packets.
//  rdata_notify_fd_ is raised for waiting repair transmissions.
//  pending_notify_fd_ is for state driven events.
434
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
435
    fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
malosek's avatar
malosek committed
436
{
Steven McCoy's avatar
Steven McCoy committed
437 438 439
    socklen_t socklen;
    bool rc;

malosek's avatar
malosek committed
440 441
    zmq_assert (send_fd_);
    zmq_assert (receive_fd_);
442
    zmq_assert (rdata_notify_fd_);
443
    zmq_assert (pending_notify_fd_);
444

Steven McCoy's avatar
Steven McCoy committed
445 446 447 448 449 450
    socklen = sizeof (*send_fd_);
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*receive_fd_);
451 452
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
453 454 455 456
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*rdata_notify_fd_);
457 458
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
459 460 461 462
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*rdata_notify_fd_));

    socklen = sizeof (*pending_notify_fd_);
463 464
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
        pending_notify_fd_, &socklen);
Steven McCoy's avatar
Steven McCoy committed
465 466
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*pending_notify_fd_));
malosek's avatar
malosek committed
467 468 469
}

//  Send one APDU, transmit window owned memory.
Steven McCoy's avatar
Steven McCoy committed
470
//  data_len_ must be less than one TPDU.
malosek's avatar
malosek committed
471 472
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
malosek's avatar
malosek committed
473
    size_t nbytes = 0;
474

Steven McCoy's avatar
Steven McCoy committed
475
    const int status = pgm_send (sock, data_, data_len_, &nbytes);
malosek's avatar
malosek committed
476

477
    //  We have to write all data as one packet.
478 479
    if (nbytes > 0) {
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
Martin Hurton's avatar
Martin Hurton committed
480
        zmq_assert (nbytes == data_len_);
481 482
    }
    else {
483 484
        zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
            status == PGM_IO_STATUS_WOULD_BLOCK);
485 486 487 488 489 490 491

        if (status == PGM_IO_STATUS_RATE_LIMITED)
            errno = ENOMEM;
        else
            errno = EBUSY;
    }

492
    //  Save return value.
493
    last_tx_status = status;
malosek's avatar
malosek committed
494 495 496 497

    return nbytes;
}

498 499
long zmq::pgm_socket_t::get_rx_timeout ()
{
500 501
    if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
          last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
502 503 504 505
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
506 507 508
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
        last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
        PGM_TIME_REMAIN, &tv, &optlen);
509 510 511 512 513 514 515 516 517 518 519 520 521 522
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

long zmq::pgm_socket_t::get_tx_timeout ()
{
    if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
523 524
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
        &optlen);
525 526 527 528 529 530 531
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

malosek's avatar
malosek committed
532
//  Return max TSDU size without fragmentation from current PGM transport.
533
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
malosek's avatar
malosek committed
534
{
Steven McCoy's avatar
Steven McCoy committed
535 536 537 538 539 540 541
    int max_tsdu = 0;
    socklen_t optlen = sizeof (max_tsdu);

    bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
    zmq_assert (rc);
    zmq_assert (optlen == sizeof (max_tsdu));
    return (size_t) max_tsdu;
malosek's avatar
malosek committed
542 543
}

544 545
//  pgm_recvmsgv is called to fill the pgm_msgv array up to  pgm_msgv_len.
//  In subsequent calls data from pgm_msgv structure are returned.
malosek's avatar
malosek committed
546
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
malosek's avatar
malosek committed
547
{
malosek's avatar
malosek committed
548 549
    size_t raw_data_len = 0;

550
    //  We just sent all data from pgm_transport_recvmsgv up
malosek's avatar
malosek committed
551 552 553 554 555 556 557
    //  and have to return 0 that another engine in this thread is scheduled.
    if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {

        //  Reset all the counters.
        nbytes_rec = 0;
        nbytes_processed = 0;
        pgm_msgv_processed = 0;
558
        errno = EAGAIN;
559
        return 0;
malosek's avatar
malosek committed
560 561 562
    }

    //  If we have are going first time or if we have processed all pgm_msgv_t
563
    //  structure previously read from the pgm socket.
malosek's avatar
malosek committed
564 565 566 567 568 569 570
    if (nbytes_rec == nbytes_processed) {

        //  Check program flow.
        zmq_assert (pgm_msgv_processed == 0);
        zmq_assert (nbytes_processed == 0);
        zmq_assert (nbytes_rec == 0);

571
        //  Receive a vector of Application Protocol Domain Unit's (APDUs)
malosek's avatar
malosek committed
572
        //  from the transport.
Steven McCoy's avatar
Steven McCoy committed
573
        pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
574

Steven McCoy's avatar
Steven McCoy committed
575 576
        const int status = pgm_recvmsgv (sock, pgm_msgv,
            pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
577

578
        //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
579
        zmq_assert (status != PGM_IO_STATUS_ERROR);
580

581 582
        last_rx_status = status;

malosek's avatar
malosek committed
583
        //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
Steven McCoy's avatar
Steven McCoy committed
584
        //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
585 586
        if (status == PGM_IO_STATUS_TIMER_PENDING) {

587 588
            zmq_assert (nbytes_rec == 0);

589
            //  In case if no RDATA/ODATA caused POLLIN 0 is
malosek's avatar
malosek committed
590 591
            //  returned.
            nbytes_rec = 0;
592
            errno = EBUSY;
593
            return 0;
malosek's avatar
malosek committed
594
        }
595

Steven McCoy's avatar
Steven McCoy committed
596 597 598 599 600
        //  Send SPMR, NAK, ACK is rate limited.
        if (status == PGM_IO_STATUS_RATE_LIMITED) {

            zmq_assert (nbytes_rec == 0);

601
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
602
            nbytes_rec = 0;
603 604
            errno = ENOMEM;
            return 0;
Steven McCoy's avatar
Steven McCoy committed
605 606 607 608 609 610 611
        }

        //  No peers and hence no incoming packets.
        if (status == PGM_IO_STATUS_WOULD_BLOCK) {

            zmq_assert (nbytes_rec == 0);

612
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
613
            nbytes_rec = 0;
614
            errno = EAGAIN;
615
            return 0;
Steven McCoy's avatar
Steven McCoy committed
616 617
        }

618 619 620
        //  Data loss.
        if (status == PGM_IO_STATUS_RESET) {

621
            struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
622 623

            //  Save lost data TSI.
Steven McCoy's avatar
Steven McCoy committed
624
            *tsi_ = &skb->tsi;
625 626 627
            nbytes_rec = 0;

            //  In case of dala loss -1 is returned.
628
            errno = EINVAL;
Steven McCoy's avatar
Steven McCoy committed
629
            pgm_free_skb (skb);
630 631 632
            return -1;
        }

633
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
malosek's avatar
malosek committed
634
    }
635 636 637 638
    else
    {
        zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
    }
malosek's avatar
malosek committed
639

640
    // Zero byte payloads are valid in PGM, but not 0MQ protocol.
malosek's avatar
malosek committed
641 642
    zmq_assert (nbytes_rec > 0);

malosek's avatar
malosek committed
643 644
    // Only one APDU per pgm_msgv_t structure is allowed.
    zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
645 646

    struct pgm_sk_buff_t* skb =
malosek's avatar
malosek committed
647 648 649 650 651 652 653 654
        pgm_msgv [pgm_msgv_processed].msgv_skb [0];

    //  Take pointers from pgm_msgv_t structure.
    *raw_data_ = skb->data;
    raw_data_len = skb->len;

    //  Save current TSI.
    *tsi_ = &skb->tsi;
malosek's avatar
malosek committed
655 656 657

    //  Move the the next pgm_msgv_t structure.
    pgm_msgv_processed++;
658
    zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
malosek's avatar
malosek committed
659 660 661 662 663
    nbytes_processed +=raw_data_len;

    return raw_data_len;
}

664
void zmq::pgm_socket_t::process_upstream ()
malosek's avatar
malosek committed
665
{
malosek's avatar
malosek committed
666
    pgm_msgv_t dummy_msg;
malosek's avatar
malosek committed
667

malosek's avatar
malosek committed
668
    size_t dummy_bytes = 0;
Steven McCoy's avatar
Steven McCoy committed
669
    pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
670

Steven McCoy's avatar
Steven McCoy committed
671 672
    const int status = pgm_recvmsgv (sock, &dummy_msg,
        1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
malosek's avatar
malosek committed
673

674
    //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
675
    zmq_assert (status != PGM_IO_STATUS_ERROR);
676

malosek's avatar
malosek committed
677
    //  No data should be returned.
678
    zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
679 680
        status == PGM_IO_STATUS_RATE_LIMITED ||
        status == PGM_IO_STATUS_WOULD_BLOCK));
681 682 683 684 685

    last_rx_status = status;

    if (status == PGM_IO_STATUS_TIMER_PENDING)
        errno = EBUSY;
686 687
    else
    if (status == PGM_IO_STATUS_RATE_LIMITED)
688 689 690
        errno = ENOMEM;
    else
        errno = EAGAIN;
malosek's avatar
malosek committed
691 692
}

693 694 695
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
    //  Convert rate into B/ms.
696
    uint64_t rate = uint64_t (options.rate) / 8;
697

698
    //  Compute the size of the buffer in bytes.
699
    uint64_t size = uint64_t (options.recovery_ivl) * rate;
700 701 702 703

    //  Translate the size into number of packets.
    uint64_t sqns = size / tpdu_;

704
    //  Buffer should be able to hold at least one packet.
705 706 707
    if (sqns == 0)
        sqns = 1;

708
    return (int) sqns;
709 710
}

malosek's avatar
malosek committed
711 712
#endif