pgm_sender.cpp 7.09 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#if defined ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include <stdlib.h>
malosek's avatar
malosek committed
35 36 37

#include "io_thread.hpp"
#include "pgm_sender.hpp"
38
#include "session_base.hpp"
malosek's avatar
malosek committed
39 40
#include "err.hpp"
#include "wire.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41
#include "stdint.hpp"
42
#include "macros.hpp"
malosek's avatar
malosek committed
43

44
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
45
                                 const options_t &options_) :
malosek's avatar
malosek committed
46
    io_object_t (parent_),
47 48
    has_tx_timer (false),
    has_rx_timer (false),
49
    session (NULL),
50
    encoder (0),
51
    more_flag (false),
malosek's avatar
malosek committed
52 53 54 55
    pgm_socket (false, options_),
    options (options_),
    out_buffer (NULL),
    out_buffer_size (0),
Martin Sustrik's avatar
Martin Sustrik committed
56
    write_size (0)
malosek's avatar
malosek committed
57
{
58 59
    int rc = msg.init ();
    errno_assert (rc == 0);
malosek's avatar
malosek committed
60 61
}

62
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
malosek's avatar
malosek committed
63
{
Martin Sustrik's avatar
Martin Sustrik committed
64 65 66 67 68
    int rc = pgm_socket.init (udp_encapsulation_, network_);
    if (rc != 0)
        return rc;

    out_buffer_size = pgm_socket.get_max_tsdu_size ();
69
    out_buffer = (unsigned char *) malloc (out_buffer_size);
70
    alloc_assert (out_buffer);
71 72

    return rc;
malosek's avatar
malosek committed
73 74
}

75
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
malosek's avatar
malosek committed
76
{
77
    LIBZMQ_UNUSED (io_thread_);
78
    //  Allocate 2 fds for PGM socket.
79 80 81 82
    fd_t downlink_socket_fd = retired_fd;
    fd_t uplink_socket_fd = retired_fd;
    fd_t rdata_notify_fd = retired_fd;
    fd_t pending_notify_fd = retired_fd;
malosek's avatar
malosek committed
83

84
    session = session_;
malosek's avatar
malosek committed
85

Martin Sustrik's avatar
Martin Sustrik committed
86 87
    //  Fill fds from PGM transport and add them to the poller.
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
88
                               &rdata_notify_fd, &pending_notify_fd);
89

malosek's avatar
malosek committed
90 91
    handle = add_fd (downlink_socket_fd);
    uplink_handle = add_fd (uplink_socket_fd);
92
    rdata_notify_handle = add_fd (rdata_notify_fd);
93
    pending_notify_handle = add_fd (pending_notify_fd);
malosek's avatar
malosek committed
94

malosek's avatar
malosek committed
95
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
96
    //  want to stop processing NAKs.
malosek's avatar
malosek committed
97
    set_pollin (uplink_handle);
malosek's avatar
malosek committed
98
    set_pollin (rdata_notify_handle);
99
    set_pollin (pending_notify_handle);
malosek's avatar
malosek committed
100 101 102 103 104 105 106

    //  Set POLLOUT for downlink_socket_handle.
    set_pollout (handle);
}

void zmq::pgm_sender_t::unplug ()
{
107 108 109 110 111 112 113 114 115 116
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
        has_tx_timer = false;
    }

malosek's avatar
malosek committed
117 118
    rm_fd (handle);
    rm_fd (uplink_handle);
malosek's avatar
malosek committed
119
    rm_fd (rdata_notify_handle);
120
    rm_fd (pending_notify_handle);
121
    session = NULL;
malosek's avatar
malosek committed
122 123
}

124 125 126 127 128 129
void zmq::pgm_sender_t::terminate ()
{
    unplug ();
    delete this;
}

130
void zmq::pgm_sender_t::restart_output ()
malosek's avatar
malosek committed
131 132
{
    set_pollout (handle);
133
    out_event ();
malosek's avatar
malosek committed
134 135
}

136
void zmq::pgm_sender_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
137 138 139 140
{
    zmq_assert (false);
}

141 142 143 144 145
const char *zmq::pgm_sender_t::get_endpoint () const
{
    return "";
}

malosek's avatar
malosek committed
146 147
zmq::pgm_sender_t::~pgm_sender_t ()
{
148 149 150
    int rc = msg.close ();
    errno_assert (rc == 0);

malosek's avatar
malosek committed
151
    if (out_buffer) {
Martin Sustrik's avatar
Martin Sustrik committed
152
        free (out_buffer);
malosek's avatar
malosek committed
153
        out_buffer = NULL;
malosek's avatar
malosek committed
154 155 156 157 158
    }
}

void zmq::pgm_sender_t::in_event ()
{
159 160 161 162 163
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

164
    //  In-event on sender side means NAK or SPMR receiving from some peer.
malosek's avatar
malosek committed
165
    pgm_socket.process_upstream ();
166
    if (errno == ENOMEM || errno == EBUSY) {
167 168 169 170
        const long timeout = pgm_socket.get_rx_timeout ();
        add_timer (timeout, rx_timer_id);
        has_rx_timer = true;
    }
malosek's avatar
malosek committed
171 172 173 174
}

void zmq::pgm_sender_t::out_event ()
{
175
    //  POLLOUT event from send socket. If write buffer is empty,
malosek's avatar
malosek committed
176
    //  try to read new data from the encoder.
Martin Sustrik's avatar
Martin Sustrik committed
177
    if (write_size == 0) {
178
        //  First two bytes (sizeof uint16_t) are used to store message
Martin Sustrik's avatar
Martin Sustrik committed
179 180
        //  offset in following steps. Note that by passing our buffer to
        //  the get data function we prevent it from returning its own buffer.
Martin Sustrik's avatar
Martin Sustrik committed
181
        unsigned char *bf = out_buffer + sizeof (uint16_t);
Martin Sustrik's avatar
Martin Sustrik committed
182
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
183 184 185 186 187
        uint16_t offset = 0xffff;

        size_t bytes = encoder.encode (&bf, bfsz);
        while (bytes < bfsz) {
            if (!more_flag && offset == 0xffff)
188
                offset = static_cast<uint16_t> (bytes);
189 190 191 192 193 194 195 196
            int rc = session->pull_msg (&msg);
            if (rc == -1)
                break;
            more_flag = msg.flags () & msg_t::more;
            encoder.load_msg (&msg);
            bf = out_buffer + sizeof (uint16_t) + bytes;
            bytes += encoder.encode (&bf, bfsz - bytes);
        }
malosek's avatar
malosek committed
197 198

        //  If there are no data to write stop polling for output.
199
        if (bytes == 0) {
malosek's avatar
malosek committed
200
            reset_pollout (handle);
Martin Sustrik's avatar
Martin Sustrik committed
201
            return;
malosek's avatar
malosek committed
202 203
        }

204 205
        write_size = sizeof (uint16_t) + bytes;

Martin Sustrik's avatar
Martin Sustrik committed
206
        //  Put offset information in the buffer.
207
        put_uint16 (out_buffer, offset);
malosek's avatar
malosek committed
208 209
    }

210 211
    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
212
        set_pollout (handle);
213
        has_tx_timer = false;
214 215
    }

Martin Sustrik's avatar
Martin Sustrik committed
216 217
    //  Send the data.
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
malosek's avatar
malosek committed
218

Martin Sustrik's avatar
Martin Sustrik committed
219
    //  We can write either all data or 0 which means rate limit reached.
220
    if (nbytes == write_size)
Martin Sustrik's avatar
Martin Sustrik committed
221
        write_size = 0;
222
    else {
Martin Sustrik's avatar
Martin Sustrik committed
223
        zmq_assert (nbytes == 0);
224 225

        if (errno == ENOMEM) {
226
            // Stop polling handle and wait for tx timeout
227 228
            const long timeout = pgm_socket.get_tx_timeout ();
            add_timer (timeout, tx_timer_id);
229
            reset_pollout (handle);
230
            has_tx_timer = true;
231
        } else
232
            errno_assert (errno == EBUSY);
233 234 235 236 237
    }
}

void zmq::pgm_sender_t::timer_event (int token)
{
238 239 240
    //  Timer cancels on return by poller_base.
    if (token == rx_timer_id) {
        has_rx_timer = false;
241
        in_event ();
242
    } else if (token == tx_timer_id) {
243
        // Restart polling handle and retry sending
244
        has_tx_timer = false;
245
        set_pollout (handle);
246
        out_event ();
247
    } else
248
        zmq_assert (false);
malosek's avatar
malosek committed
249
}
Martin Sustrik's avatar
Martin Sustrik committed
250

malosek's avatar
malosek committed
251
#endif