pgm_sender.cpp 7.07 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#if defined ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include <stdlib.h>
malosek's avatar
malosek committed
35 36 37

#include "io_thread.hpp"
#include "pgm_sender.hpp"
38
#include "session_base.hpp"
malosek's avatar
malosek committed
39 40
#include "err.hpp"
#include "wire.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41
#include "stdint.hpp"
42
#include "macros.hpp"
malosek's avatar
malosek committed
43

44
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
45
      const options_t &options_) :
malosek's avatar
malosek committed
46
    io_object_t (parent_),
47 48
    has_tx_timer (false),
    has_rx_timer (false),
49
    session (NULL),
50
    encoder (0),
51
    more_flag (false),
malosek's avatar
malosek committed
52 53 54 55
    pgm_socket (false, options_),
    options (options_),
    out_buffer (NULL),
    out_buffer_size (0),
Martin Sustrik's avatar
Martin Sustrik committed
56
    write_size (0)
malosek's avatar
malosek committed
57
{
58 59
    int rc = msg.init ();
    errno_assert (rc == 0);
malosek's avatar
malosek committed
60 61
}

62
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
malosek's avatar
malosek committed
63
{
Martin Sustrik's avatar
Martin Sustrik committed
64 65 66 67 68 69
    int rc = pgm_socket.init (udp_encapsulation_, network_);
    if (rc != 0)
        return rc;

    out_buffer_size = pgm_socket.get_max_tsdu_size ();
    out_buffer = (unsigned char*) malloc (out_buffer_size);
70
    alloc_assert (out_buffer);
71 72

    return rc;
malosek's avatar
malosek committed
73 74
}

75
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
malosek's avatar
malosek committed
76
{
77
    LIBZMQ_UNUSED (io_thread_);
78
    //  Allocate 2 fds for PGM socket.
79 80 81 82
    fd_t downlink_socket_fd = retired_fd;
    fd_t uplink_socket_fd = retired_fd;
    fd_t rdata_notify_fd = retired_fd;
    fd_t pending_notify_fd = retired_fd;
malosek's avatar
malosek committed
83

84
    session = session_;
malosek's avatar
malosek committed
85

Martin Sustrik's avatar
Martin Sustrik committed
86 87
    //  Fill fds from PGM transport and add them to the poller.
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
88 89
        &rdata_notify_fd, &pending_notify_fd);

malosek's avatar
malosek committed
90 91
    handle = add_fd (downlink_socket_fd);
    uplink_handle = add_fd (uplink_socket_fd);
92
    rdata_notify_handle = add_fd (rdata_notify_fd);
93
    pending_notify_handle = add_fd (pending_notify_fd);
malosek's avatar
malosek committed
94

malosek's avatar
malosek committed
95
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
96
    //  want to stop processing NAKs.
malosek's avatar
malosek committed
97
    set_pollin (uplink_handle);
malosek's avatar
malosek committed
98
    set_pollin (rdata_notify_handle);
99
    set_pollin (pending_notify_handle);
malosek's avatar
malosek committed
100 101 102 103 104 105 106

    //  Set POLLOUT for downlink_socket_handle.
    set_pollout (handle);
}

void zmq::pgm_sender_t::unplug ()
{
107 108 109 110 111 112 113 114 115 116
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
        has_tx_timer = false;
    }

malosek's avatar
malosek committed
117 118
    rm_fd (handle);
    rm_fd (uplink_handle);
malosek's avatar
malosek committed
119
    rm_fd (rdata_notify_handle);
120
    rm_fd (pending_notify_handle);
121
    session = NULL;
malosek's avatar
malosek committed
122 123
}

124 125 126 127 128 129
void zmq::pgm_sender_t::terminate ()
{
    unplug ();
    delete this;
}

130
void zmq::pgm_sender_t::restart_output ()
malosek's avatar
malosek committed
131 132
{
    set_pollout (handle);
133
    out_event ();
malosek's avatar
malosek committed
134 135
}

136
void zmq::pgm_sender_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
137 138 139 140
{
    zmq_assert (false);
}

141 142 143 144 145
const char *zmq::pgm_sender_t::get_endpoint () const
{
    return "";
}

malosek's avatar
malosek committed
146 147
zmq::pgm_sender_t::~pgm_sender_t ()
{
148 149 150
    int rc = msg.close ();
    errno_assert (rc == 0);

malosek's avatar
malosek committed
151
    if (out_buffer) {
Martin Sustrik's avatar
Martin Sustrik committed
152
        free (out_buffer);
malosek's avatar
malosek committed
153
        out_buffer = NULL;
malosek's avatar
malosek committed
154 155 156 157 158
    }
}

void zmq::pgm_sender_t::in_event ()
{
159 160 161 162 163
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

164
    //  In-event on sender side means NAK or SPMR receiving from some peer.
malosek's avatar
malosek committed
165
    pgm_socket.process_upstream ();
166
    if (errno == ENOMEM || errno == EBUSY) {
167 168 169 170
        const long timeout = pgm_socket.get_rx_timeout ();
        add_timer (timeout, rx_timer_id);
        has_rx_timer = true;
    }
malosek's avatar
malosek committed
171 172 173 174
}

void zmq::pgm_sender_t::out_event ()
{
175
    //  POLLOUT event from send socket. If write buffer is empty,
malosek's avatar
malosek committed
176
    //  try to read new data from the encoder.
Martin Sustrik's avatar
Martin Sustrik committed
177
    if (write_size == 0) {
malosek's avatar
malosek committed
178

179
        //  First two bytes (sizeof uint16_t) are used to store message
Martin Sustrik's avatar
Martin Sustrik committed
180 181
        //  offset in following steps. Note that by passing our buffer to
        //  the get data function we prevent it from returning its own buffer.
Martin Sustrik's avatar
Martin Sustrik committed
182
        unsigned char *bf = out_buffer + sizeof (uint16_t);
Martin Sustrik's avatar
Martin Sustrik committed
183
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
184 185 186 187 188 189 190 191 192 193 194 195 196 197
        uint16_t offset = 0xffff;

        size_t bytes = encoder.encode (&bf, bfsz);
        while (bytes < bfsz) {
            if (!more_flag && offset == 0xffff)
                offset = static_cast <uint16_t> (bytes);
            int rc = session->pull_msg (&msg);
            if (rc == -1)
                break;
            more_flag = msg.flags () & msg_t::more;
            encoder.load_msg (&msg);
            bf = out_buffer + sizeof (uint16_t) + bytes;
            bytes += encoder.encode (&bf, bfsz - bytes);
        }
malosek's avatar
malosek committed
198 199

        //  If there are no data to write stop polling for output.
200
        if (bytes == 0) {
malosek's avatar
malosek committed
201
            reset_pollout (handle);
Martin Sustrik's avatar
Martin Sustrik committed
202
            return;
malosek's avatar
malosek committed
203 204
        }

205 206
        write_size = sizeof (uint16_t) + bytes;

Martin Sustrik's avatar
Martin Sustrik committed
207
        //  Put offset information in the buffer.
208
        put_uint16 (out_buffer, offset);
malosek's avatar
malosek committed
209 210
    }

211 212
    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
213
        set_pollout (handle);
214
        has_tx_timer = false;
215 216
    }

Martin Sustrik's avatar
Martin Sustrik committed
217 218
    //  Send the data.
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
malosek's avatar
malosek committed
219

Martin Sustrik's avatar
Martin Sustrik committed
220
    //  We can write either all data or 0 which means rate limit reached.
221
    if (nbytes == write_size)
Martin Sustrik's avatar
Martin Sustrik committed
222
        write_size = 0;
223
    else {
Martin Sustrik's avatar
Martin Sustrik committed
224
        zmq_assert (nbytes == 0);
225 226

        if (errno == ENOMEM) {
227
            // Stop polling handle and wait for tx timeout
228 229
            const long timeout = pgm_socket.get_tx_timeout ();
            add_timer (timeout, tx_timer_id);
230
            reset_pollout (handle);
231
            has_tx_timer = true;
232 233
        }
        else
234
            errno_assert (errno == EBUSY);
235 236 237 238 239
    }
}

void zmq::pgm_sender_t::timer_event (int token)
{
240 241 242
    //  Timer cancels on return by poller_base.
    if (token == rx_timer_id) {
        has_rx_timer = false;
243
        in_event ();
244 245 246
    }
    else
    if (token == tx_timer_id) {
247
        // Restart polling handle and retry sending
248
        has_tx_timer = false;
249
        set_pollout (handle);
250
        out_event ();
251 252
    }
    else
253
        zmq_assert (false);
malosek's avatar
malosek committed
254
}
Martin Sustrik's avatar
Martin Sustrik committed
255

malosek's avatar
malosek committed
256 257
#endif