pgm_socket.cpp 22.6 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31 32
#include "platform.hpp"

malosek's avatar
malosek committed
33
#ifdef ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
34

35 36 37 38
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif

malosek's avatar
malosek committed
39
#ifdef ZMQ_HAVE_LINUX
40
#include <poll.h>
malosek's avatar
malosek committed
41 42
#endif

Martin Sustrik's avatar
Martin Sustrik committed
43
#include <stdlib.h>
44
#include <string.h>
malosek's avatar
malosek committed
45 46 47 48 49 50
#include <string>

#include "options.hpp"
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
51
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
52
#include "stdint.hpp"
malosek's avatar
malosek committed
53

54
#ifndef MSG_ERRQUEUE
Martin Sustrik's avatar
Martin Sustrik committed
55
#define MSG_ERRQUEUE 0x2000
56 57
#endif

malosek's avatar
malosek committed
58
zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
Steven McCoy's avatar
Steven McCoy committed
59
    sock (NULL),
malosek's avatar
malosek committed
60 61 62
    options (options_),
    receiver (receiver_),
    pgm_msgv (NULL),
63
    pgm_msgv_len (0),
malosek's avatar
malosek committed
64 65
    nbytes_rec (0),
    nbytes_processed (0),
66
    pgm_msgv_processed (0)
malosek's avatar
malosek committed
67 68 69
{
}

70
//  Resolve PGM socket address.
71 72 73 74
//  network_ of the form <interface & multicast group decls>:<IP port>
//  e.g. eth0;239.192.0.1:7500
//       link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
//       ;[fe80::1%en0]:7500
75
int zmq::pgm_socket_t::init_address (const char *network_,
Ian Barber's avatar
Ian Barber committed
76
    struct pgm_addrinfo_t **res, uint16_t *port_number)
malosek's avatar
malosek committed
77
{
Steven McCoy's avatar
Steven McCoy committed
78 79
    //  Parse port number, start from end for IPv6
    const char *port_delim = strrchr (network_, ':');
malosek's avatar
malosek committed
80 81 82 83 84
    if (!port_delim) {
        errno = EINVAL;
        return -1;
    }

85
    *port_number = atoi (port_delim + 1);
86

Martin Sustrik's avatar
Martin Sustrik committed
87
    char network [256];
88
    if (port_delim - network_ >= (int) sizeof (network) - 1) {
malosek's avatar
malosek committed
89 90 91 92
        errno = EINVAL;
        return -1;
    }
    memset (network, '\0', sizeof (network));
93
    memcpy (network, network_, port_delim - network_);
malosek's avatar
malosek committed
94

Steven McCoy's avatar
Steven McCoy committed
95
    pgm_error_t *pgm_error = NULL;
Ian Barber's avatar
Ian Barber committed
96
    struct pgm_addrinfo_t hints;
Steven McCoy's avatar
Steven McCoy committed
97 98 99

    memset (&hints, 0, sizeof (hints));
    hints.ai_family = AF_UNSPEC;
Ian Barber's avatar
Ian Barber committed
100
    if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
101 102

        //  Invalid parameters don't set pgm_error_t.
103
        zmq_assert (pgm_error != NULL);
104
        if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
105 106

              //  NB: cannot catch EAI_BADFLAGS.
107 108
            ( pgm_error->code != PGM_ERROR_SERVICE &&
              pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
109 110

            //  User, host, or network configuration or transient error.
111 112 113 114
            pgm_error_free (pgm_error);
            errno = EINVAL;
            return -1;
        }
Steven McCoy's avatar
Steven McCoy committed
115

116
        //  Fatal OpenPGM internal error.
117
        zmq_assert (false);
malosek's avatar
malosek committed
118
    }
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    return 0;
}

//  Create, bind and connect PGM socket.
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
    //  Can not open transport before destroying old one.
    zmq_assert (sock == NULL);
    zmq_assert (options.rate > 0);

    //  Zero counter used in msgrecv.
    nbytes_rec = 0;
    nbytes_processed = 0;
    pgm_msgv_processed = 0;

    uint16_t port_number;
    struct pgm_addrinfo_t *res = NULL;
    sa_family_t sa_family;

    pgm_error_t *pgm_error = NULL;

    if (init_address(network_, &res, &port_number) < 0) {
        goto err_abort;
    }
malosek's avatar
malosek committed
143

144 145
    zmq_assert (res != NULL);

146
    //  Pick up detected IP family.
Steven McCoy's avatar
Steven McCoy committed
147
    sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
malosek's avatar
malosek committed
148

149
    //  Create IP/PGM or UDP/PGM socket.
Martin Sustrik's avatar
Martin Sustrik committed
150
    if (udp_encapsulation_) {
151 152 153 154
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
              &pgm_error)) {

            //  Invalid parameters don't set pgm_error_t.
155
            zmq_assert (pgm_error != NULL);
Steven McCoy's avatar
Steven McCoy committed
156
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
157 158 159 160
                  pgm_error->code != PGM_ERROR_BADF &&
                  pgm_error->code != PGM_ERROR_FAULT &&
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
                  pgm_error->code != PGM_ERROR_FAILED))
161 162

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
163 164
                goto err_abort;

165
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
166
            zmq_assert (false);
167 168
        }

Steven McCoy's avatar
Steven McCoy committed
169 170
        //  All options are of data type int
        const int encapsulation_port = port_number;
171
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
172 173 174
                &encapsulation_port, sizeof (encapsulation_port)))
            goto err_abort;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
175
                &encapsulation_port, sizeof (encapsulation_port)))
Steven McCoy's avatar
Steven McCoy committed
176
            goto err_abort;
177 178 179 180 181 182
    }
    else {
        if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
              &pgm_error)) {

            //  Invalid parameters don't set pgm_error_t.
183
            zmq_assert (pgm_error != NULL);
Steven McCoy's avatar
Steven McCoy committed
184
            if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
185 186 187 188
                  pgm_error->code != PGM_ERROR_BADF &&
                  pgm_error->code != PGM_ERROR_FAULT &&
                  pgm_error->code != PGM_ERROR_NOPROTOOPT &&
                  pgm_error->code != PGM_ERROR_FAILED))
189 190

                //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
191 192
                goto err_abort;

193
            //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
194 195
            zmq_assert (false);
        }
malosek's avatar
malosek committed
196 197
    }

Steven McCoy's avatar
Steven McCoy committed
198
    {
199 200 201 202 203 204 205 206 207 208 209 210 211 212
        const int rcvbuf = (int) options.rcvbuf;
        if (rcvbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
                  sizeof (rcvbuf)))
                goto err_abort;
        }

        const int sndbuf = (int) options.sndbuf;
        if (sndbuf >= 0) {
            if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
                  sizeof (sndbuf)))
                goto err_abort;
        }

213
        const int max_tpdu = (int) options.multicast_maxtpdu;
214 215 216
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
              sizeof (max_tpdu)))
            goto err_abort;
malosek's avatar
malosek committed
217 218
    }

malosek's avatar
malosek committed
219
    if (receiver) {
Steven McCoy's avatar
Steven McCoy committed
220
        const int recv_only        = 1,
221
                  rxw_max_tpdu     = (int) options.multicast_maxtpdu,
222
                  rxw_sqns         = compute_sqns (rxw_max_tpdu),
223
                  peer_expiry      = pgm_secs (300),
Steven McCoy's avatar
Steven McCoy committed
224 225 226 227
                  spmr_expiry      = pgm_msecs (25),
                  nak_bo_ivl       = pgm_msecs (50),
                  nak_rpt_ivl      = pgm_msecs (200),
                  nak_rdata_ivl    = pgm_msecs (200),
228 229
                  nak_data_retries = 50,
                  nak_ncf_retries  = 50;
Steven McCoy's avatar
Steven McCoy committed
230

231 232
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
                sizeof (recv_only)) ||
233 234
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
                sizeof (rxw_sqns)) ||
235 236 237 238 239 240 241 242 243 244 245 246 247 248
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
                sizeof (peer_expiry)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
                sizeof (spmr_expiry)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
                sizeof (nak_bo_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
                sizeof (nak_rpt_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
                &nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
                &nak_data_retries, sizeof (nak_data_retries)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
                &nak_ncf_retries, sizeof (nak_ncf_retries)))
Steven McCoy's avatar
Steven McCoy committed
249
            goto err_abort;
250 251
    }
    else {
Steven McCoy's avatar
Steven McCoy committed
252
        const int send_only        = 1,
Steven McCoy's avatar
Steven McCoy committed
253
                  max_rte      = (int) ((options.rate * 1000) / 8),
254
                  txw_max_tpdu     = (int) options.multicast_maxtpdu,
255
                  txw_sqns         = compute_sqns (txw_max_tpdu),
256 257 258 259 260 261 262 263 264 265
                  ambient_spm      = pgm_secs (30),
                  heartbeat_spm[]  = { pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (100),
                                       pgm_msecs (1300),
                                       pgm_secs  (7),
                                       pgm_secs  (16),
                                       pgm_secs  (25),
                                       pgm_secs  (30) };
Steven McCoy's avatar
Steven McCoy committed
266

267 268
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
                &send_only, sizeof (send_only)) ||
Steven McCoy's avatar
Steven McCoy committed
269 270
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
                &max_rte, sizeof (max_rte)) ||
271 272
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
                &txw_sqns, sizeof (txw_sqns)) ||
273 274 275 276
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
                &ambient_spm, sizeof (ambient_spm)) ||
            !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
                &heartbeat_spm, sizeof (heartbeat_spm)))
Steven McCoy's avatar
Steven McCoy committed
277 278
            goto err_abort;
    }
malosek's avatar
malosek committed
279

Steven McCoy's avatar
Steven McCoy committed
280 281
    //  PGM transport GSI.
    struct pgm_sockaddr_t addr;
Martin Sustrik's avatar
Martin Sustrik committed
282

Steven McCoy's avatar
Steven McCoy committed
283 284 285
    memset (&addr, 0, sizeof(addr));
    addr.sa_port = port_number;
    addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
malosek's avatar
malosek committed
286

287 288 289 290 291 292
    //  Create random GSI.
    uint32_t buf [2];
    buf [0] = generate_random ();
    buf [1] = generate_random ();
    if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
        goto err_abort;
293

malosek's avatar
malosek committed
294

Steven McCoy's avatar
Steven McCoy committed
295 296 297 298 299 300 301 302 303
    //  Bind a transport to the specified network devices.
    struct pgm_interface_req_t if_req;
    memset (&if_req, 0, sizeof(if_req));
    if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
    if_req.ir_scope_id  = 0;
    if (AF_INET6 == sa_family) {
        struct sockaddr_in6 sa6;
        memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
        if_req.ir_scope_id = sa6.sin6_scope_id;
malosek's avatar
malosek committed
304
    }
305 306 307 308
    if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
          &if_req, sizeof (if_req), &pgm_error)) {

        //  Invalid parameters don't set pgm_error_t.
309 310 311 312 313 314
        zmq_assert (pgm_error != NULL);
        if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
             pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
             pgm_error->code != PGM_ERROR_INVAL &&
             pgm_error->code != PGM_ERROR_BADF &&
             pgm_error->code != PGM_ERROR_FAULT))
315 316

            //  User, host, or network configuration or transient error.
Steven McCoy's avatar
Steven McCoy committed
317 318
            goto err_abort;

319
        //  Fatal OpenPGM internal error.
Steven McCoy's avatar
Steven McCoy committed
320
        zmq_assert (false);
malosek's avatar
malosek committed
321 322
    }

323 324 325 326
    //  Join IP multicast groups.
    for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
              &res->ai_recv_addrs [i], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
327 328
            goto err_abort;
    }
329 330
    if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
          &res->ai_send_addrs [0], sizeof (struct group_req)))
Steven McCoy's avatar
Steven McCoy committed
331
        goto err_abort;
332

Steven McCoy's avatar
Steven McCoy committed
333
    pgm_freeaddrinfo (res);
334
    res = NULL;
335

336
    //  Set IP level parameters.
Steven McCoy's avatar
Steven McCoy committed
337
    {
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
        // Multicast loopback disabled by default
        const int multicast_loop = 0;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
              &multicast_loop, sizeof (multicast_loop)))
            goto err_abort;

        const int multicast_hops = options.multicast_hops;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
                &multicast_hops, sizeof (multicast_hops)))
            goto err_abort;

        //  Expedited Forwarding PHB for network elements, no ECN.
        //  Ignore return value due to varied runtime support.
        const int dscp = 0x2e << 2;
        if (AF_INET6 != sa_family)
            pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS,
               &dscp, sizeof (dscp));

        const int nonblocking = 1;
        if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
              &nonblocking, sizeof (nonblocking)))
            goto err_abort;
360
    }
malosek's avatar
malosek committed
361

362 363
    //  Connect PGM transport to start state machine.
    if (!pgm_connect (sock, &pgm_error)) {
364 365

        //  Invalid parameters don't set pgm_error_t.
366 367 368 369
        zmq_assert (pgm_error != NULL);
        goto err_abort;
    }

Martin Sustrik's avatar
Martin Sustrik committed
370 371 372 373 374 375 376 377 378 379
    //  For receiver transport preallocate pgm_msgv array.
    if (receiver) {
        zmq_assert (in_batch_size > 0);
        size_t max_tsdu_size = get_max_tsdu_size ();
        pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
        if ((int) in_batch_size % max_tsdu_size)
            pgm_msgv_len++;
        zmq_assert (pgm_msgv_len);

        pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
380
        alloc_assert (pgm_msgv);
Martin Sustrik's avatar
Martin Sustrik committed
381 382
    }

malosek's avatar
malosek committed
383
    return 0;
Steven McCoy's avatar
Steven McCoy committed
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399

err_abort:
    if (sock != NULL) {
        pgm_close (sock, FALSE);
        sock = NULL;
    }
    if (res != NULL) {
        pgm_freeaddrinfo (res);
        res = NULL;
    }
    if (pgm_error != NULL) {
        pgm_error_free (pgm_error);
        pgm_error = NULL;
    }
    errno = EINVAL;
    return -1;
malosek's avatar
malosek committed
400 401 402 403
}

zmq::pgm_socket_t::~pgm_socket_t ()
{
Martin Sustrik's avatar
Martin Sustrik committed
404 405
    if (pgm_msgv)
        free (pgm_msgv);
406
    if (sock)
Steven McCoy's avatar
Steven McCoy committed
407
        pgm_close (sock, TRUE);
malosek's avatar
malosek committed
408 409
}

410 411
//  Get receiver fds. receive_fd_ is signaled for incoming packets,
//  waiting_pipe_fd_ is signaled for state driven events and data.
412
void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
413
    fd_t *waiting_pipe_fd_)
malosek's avatar
malosek committed
414
{
Steven McCoy's avatar
Steven McCoy committed
415 416 417
    socklen_t socklen;
    bool rc;

418 419 420
    zmq_assert (receive_fd_);
    zmq_assert (waiting_pipe_fd_);

Steven McCoy's avatar
Steven McCoy committed
421
    socklen = sizeof (*receive_fd_);
422 423
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
424 425
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));
426

Steven McCoy's avatar
Steven McCoy committed
427
    socklen = sizeof (*waiting_pipe_fd_);
428 429
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
430 431
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
malosek's avatar
malosek committed
432 433
}

434
//  Get fds and store them into user allocated memory.
Steven McCoy's avatar
Steven McCoy committed
435 436 437 438
//  send_fd is for non-blocking send wire notifications.
//  receive_fd_ is for incoming back-channel protocol packets.
//  rdata_notify_fd_ is raised for waiting repair transmissions.
//  pending_notify_fd_ is for state driven events.
439
void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_,
440
    fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
malosek's avatar
malosek committed
441
{
Steven McCoy's avatar
Steven McCoy committed
442 443 444
    socklen_t socklen;
    bool rc;

malosek's avatar
malosek committed
445 446
    zmq_assert (send_fd_);
    zmq_assert (receive_fd_);
447
    zmq_assert (rdata_notify_fd_);
448
    zmq_assert (pending_notify_fd_);
449

Steven McCoy's avatar
Steven McCoy committed
450 451 452 453 454 455
    socklen = sizeof (*send_fd_);
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*receive_fd_);
456 457
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
458 459 460 461
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*receive_fd_));

    socklen = sizeof (*rdata_notify_fd_);
462 463
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
        &socklen);
Steven McCoy's avatar
Steven McCoy committed
464 465 466 467
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*rdata_notify_fd_));

    socklen = sizeof (*pending_notify_fd_);
468 469
    rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
        pending_notify_fd_, &socklen);
Steven McCoy's avatar
Steven McCoy committed
470 471
    zmq_assert (rc);
    zmq_assert (socklen == sizeof (*pending_notify_fd_));
malosek's avatar
malosek committed
472 473 474
}

//  Send one APDU, transmit window owned memory.
Steven McCoy's avatar
Steven McCoy committed
475
//  data_len_ must be less than one TPDU.
malosek's avatar
malosek committed
476 477
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
malosek's avatar
malosek committed
478
    size_t nbytes = 0;
479

Steven McCoy's avatar
Steven McCoy committed
480
    const int status = pgm_send (sock, data_, data_len_, &nbytes);
malosek's avatar
malosek committed
481

482
    //  We have to write all data as one packet.
483 484
    if (nbytes > 0) {
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
Martin Hurton's avatar
Martin Hurton committed
485
        zmq_assert (nbytes == data_len_);
486 487
    }
    else {
488 489
        zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
            status == PGM_IO_STATUS_WOULD_BLOCK);
490 491 492 493 494 495 496

        if (status == PGM_IO_STATUS_RATE_LIMITED)
            errno = ENOMEM;
        else
            errno = EBUSY;
    }

497
    //  Save return value.
498
    last_tx_status = status;
malosek's avatar
malosek committed
499 500 501 502

    return nbytes;
}

503 504
long zmq::pgm_socket_t::get_rx_timeout ()
{
505 506
    if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
          last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
507 508 509 510
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
511 512 513
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
        last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
        PGM_TIME_REMAIN, &tv, &optlen);
514 515 516 517 518 519 520 521 522 523 524 525 526 527
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

long zmq::pgm_socket_t::get_tx_timeout ()
{
    if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
        return -1;

    struct timeval tv;
    socklen_t optlen = sizeof (tv);
528 529
    const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
        &optlen);
530 531 532 533 534 535 536
    zmq_assert (rc);

    const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);

    return timeout;
}

malosek's avatar
malosek committed
537
//  Return max TSDU size without fragmentation from current PGM transport.
538
size_t zmq::pgm_socket_t::get_max_tsdu_size ()
malosek's avatar
malosek committed
539
{
Steven McCoy's avatar
Steven McCoy committed
540 541 542 543 544 545 546
    int max_tsdu = 0;
    socklen_t optlen = sizeof (max_tsdu);

    bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
    zmq_assert (rc);
    zmq_assert (optlen == sizeof (max_tsdu));
    return (size_t) max_tsdu;
malosek's avatar
malosek committed
547 548
}

549 550
//  pgm_recvmsgv is called to fill the pgm_msgv array up to  pgm_msgv_len.
//  In subsequent calls data from pgm_msgv structure are returned.
malosek's avatar
malosek committed
551
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
malosek's avatar
malosek committed
552
{
malosek's avatar
malosek committed
553 554
    size_t raw_data_len = 0;

555
    //  We just sent all data from pgm_transport_recvmsgv up
malosek's avatar
malosek committed
556 557 558 559 560 561 562
    //  and have to return 0 that another engine in this thread is scheduled.
    if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {

        //  Reset all the counters.
        nbytes_rec = 0;
        nbytes_processed = 0;
        pgm_msgv_processed = 0;
563
        errno = EAGAIN;
564
        return 0;
malosek's avatar
malosek committed
565 566 567
    }

    //  If we have are going first time or if we have processed all pgm_msgv_t
568
    //  structure previously read from the pgm socket.
malosek's avatar
malosek committed
569 570 571 572 573 574 575
    if (nbytes_rec == nbytes_processed) {

        //  Check program flow.
        zmq_assert (pgm_msgv_processed == 0);
        zmq_assert (nbytes_processed == 0);
        zmq_assert (nbytes_rec == 0);

576
        //  Receive a vector of Application Protocol Domain Unit's (APDUs)
malosek's avatar
malosek committed
577
        //  from the transport.
Steven McCoy's avatar
Steven McCoy committed
578
        pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
579

Steven McCoy's avatar
Steven McCoy committed
580 581
        const int status = pgm_recvmsgv (sock, pgm_msgv,
            pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
582

583
        //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
584
        zmq_assert (status != PGM_IO_STATUS_ERROR);
585

586 587
        last_rx_status = status;

malosek's avatar
malosek committed
588
        //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
Steven McCoy's avatar
Steven McCoy committed
589
        //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
590 591
        if (status == PGM_IO_STATUS_TIMER_PENDING) {

592 593
            zmq_assert (nbytes_rec == 0);

594
            //  In case if no RDATA/ODATA caused POLLIN 0 is
malosek's avatar
malosek committed
595 596
            //  returned.
            nbytes_rec = 0;
597
            errno = EBUSY;
598
            return 0;
malosek's avatar
malosek committed
599
        }
600

Steven McCoy's avatar
Steven McCoy committed
601 602 603 604 605
        //  Send SPMR, NAK, ACK is rate limited.
        if (status == PGM_IO_STATUS_RATE_LIMITED) {

            zmq_assert (nbytes_rec == 0);

606
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
607
            nbytes_rec = 0;
608 609
            errno = ENOMEM;
            return 0;
Steven McCoy's avatar
Steven McCoy committed
610 611 612 613 614 615 616
        }

        //  No peers and hence no incoming packets.
        if (status == PGM_IO_STATUS_WOULD_BLOCK) {

            zmq_assert (nbytes_rec == 0);

617
            //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
Steven McCoy's avatar
Steven McCoy committed
618
            nbytes_rec = 0;
619
            errno = EAGAIN;
620
            return 0;
Steven McCoy's avatar
Steven McCoy committed
621 622
        }

623 624 625
        //  Data loss.
        if (status == PGM_IO_STATUS_RESET) {

626
            struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
627 628

            //  Save lost data TSI.
Steven McCoy's avatar
Steven McCoy committed
629
            *tsi_ = &skb->tsi;
630 631 632
            nbytes_rec = 0;

            //  In case of dala loss -1 is returned.
633
            errno = EINVAL;
Steven McCoy's avatar
Steven McCoy committed
634
            pgm_free_skb (skb);
635 636 637
            return -1;
        }

638
        zmq_assert (status == PGM_IO_STATUS_NORMAL);
malosek's avatar
malosek committed
639
    }
640 641 642 643
    else
    {
        zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
    }
malosek's avatar
malosek committed
644

645
    // Zero byte payloads are valid in PGM, but not 0MQ protocol.
malosek's avatar
malosek committed
646 647
    zmq_assert (nbytes_rec > 0);

malosek's avatar
malosek committed
648 649
    // Only one APDU per pgm_msgv_t structure is allowed.
    zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
650 651

    struct pgm_sk_buff_t* skb =
malosek's avatar
malosek committed
652 653 654 655 656 657 658 659
        pgm_msgv [pgm_msgv_processed].msgv_skb [0];

    //  Take pointers from pgm_msgv_t structure.
    *raw_data_ = skb->data;
    raw_data_len = skb->len;

    //  Save current TSI.
    *tsi_ = &skb->tsi;
malosek's avatar
malosek committed
660 661 662

    //  Move the the next pgm_msgv_t structure.
    pgm_msgv_processed++;
663
    zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
malosek's avatar
malosek committed
664 665 666 667 668
    nbytes_processed +=raw_data_len;

    return raw_data_len;
}

669
void zmq::pgm_socket_t::process_upstream ()
malosek's avatar
malosek committed
670
{
malosek's avatar
malosek committed
671
    pgm_msgv_t dummy_msg;
malosek's avatar
malosek committed
672

malosek's avatar
malosek committed
673
    size_t dummy_bytes = 0;
Steven McCoy's avatar
Steven McCoy committed
674
    pgm_error_t *pgm_error = NULL;
malosek's avatar
malosek committed
675

Steven McCoy's avatar
Steven McCoy committed
676 677
    const int status = pgm_recvmsgv (sock, &dummy_msg,
        1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
malosek's avatar
malosek committed
678

679
    //  Invalid parameters.
Martin Sustrik's avatar
Martin Sustrik committed
680
    zmq_assert (status != PGM_IO_STATUS_ERROR);
681

malosek's avatar
malosek committed
682
    //  No data should be returned.
683
    zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
684 685
        status == PGM_IO_STATUS_RATE_LIMITED ||
        status == PGM_IO_STATUS_WOULD_BLOCK));
686 687 688 689 690

    last_rx_status = status;

    if (status == PGM_IO_STATUS_TIMER_PENDING)
        errno = EBUSY;
691 692
    else
    if (status == PGM_IO_STATUS_RATE_LIMITED)
693 694 695
        errno = ENOMEM;
    else
        errno = EAGAIN;
malosek's avatar
malosek committed
696 697
}

698 699 700
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
    //  Convert rate into B/ms.
701
    uint64_t rate = uint64_t (options.rate) / 8;
702

703
    //  Compute the size of the buffer in bytes.
704
    uint64_t size = uint64_t (options.recovery_ivl) * rate;
705 706 707 708

    //  Translate the size into number of packets.
    uint64_t sqns = size / tpdu_;

709
    //  Buffer should be able to hold at least one packet.
710 711 712
    if (sqns == 0)
        sqns = 1;

713
    return (int) sqns;
714 715
}

malosek's avatar
malosek committed
716 717
#endif