pgm_sender.cpp 7.31 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#if defined ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include <stdlib.h>
malosek's avatar
malosek committed
35 36 37

#include "io_thread.hpp"
#include "pgm_sender.hpp"
38
#include "session_base.hpp"
malosek's avatar
malosek committed
39 40
#include "err.hpp"
#include "wire.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41
#include "stdint.hpp"
42
#include "macros.hpp"
malosek's avatar
malosek committed
43

44
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
45
                                 const options_t &options_) :
malosek's avatar
malosek committed
46
    io_object_t (parent_),
47 48
    has_tx_timer (false),
    has_rx_timer (false),
49
    session (NULL),
50
    encoder (0),
51
    more_flag (false),
malosek's avatar
malosek committed
52 53
    pgm_socket (false, options_),
    options (options_),
54 55 56 57
    handle (static_cast<handle_t> (NULL)),
    uplink_handle (static_cast<handle_t> (NULL)),
    rdata_notify_handle (static_cast<handle_t> (NULL)),
    pending_notify_handle (static_cast<handle_t> (NULL)),
malosek's avatar
malosek committed
58 59
    out_buffer (NULL),
    out_buffer_size (0),
Martin Sustrik's avatar
Martin Sustrik committed
60
    write_size (0)
malosek's avatar
malosek committed
61
{
62 63
    int rc = msg.init ();
    errno_assert (rc == 0);
malosek's avatar
malosek committed
64 65
}

66
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
malosek's avatar
malosek committed
67
{
Martin Sustrik's avatar
Martin Sustrik committed
68 69 70 71 72
    int rc = pgm_socket.init (udp_encapsulation_, network_);
    if (rc != 0)
        return rc;

    out_buffer_size = pgm_socket.get_max_tsdu_size ();
73
    out_buffer = (unsigned char *) malloc (out_buffer_size);
74
    alloc_assert (out_buffer);
75 76

    return rc;
malosek's avatar
malosek committed
77 78
}

79
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
malosek's avatar
malosek committed
80
{
81
    LIBZMQ_UNUSED (io_thread_);
82
    //  Allocate 2 fds for PGM socket.
83 84 85 86
    fd_t downlink_socket_fd = retired_fd;
    fd_t uplink_socket_fd = retired_fd;
    fd_t rdata_notify_fd = retired_fd;
    fd_t pending_notify_fd = retired_fd;
malosek's avatar
malosek committed
87

88
    session = session_;
malosek's avatar
malosek committed
89

Martin Sustrik's avatar
Martin Sustrik committed
90 91
    //  Fill fds from PGM transport and add them to the poller.
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
92
                               &rdata_notify_fd, &pending_notify_fd);
93

malosek's avatar
malosek committed
94 95
    handle = add_fd (downlink_socket_fd);
    uplink_handle = add_fd (uplink_socket_fd);
96
    rdata_notify_handle = add_fd (rdata_notify_fd);
97
    pending_notify_handle = add_fd (pending_notify_fd);
malosek's avatar
malosek committed
98

malosek's avatar
malosek committed
99
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
100
    //  want to stop processing NAKs.
malosek's avatar
malosek committed
101
    set_pollin (uplink_handle);
malosek's avatar
malosek committed
102
    set_pollin (rdata_notify_handle);
103
    set_pollin (pending_notify_handle);
malosek's avatar
malosek committed
104 105 106 107 108 109 110

    //  Set POLLOUT for downlink_socket_handle.
    set_pollout (handle);
}

void zmq::pgm_sender_t::unplug ()
{
111 112 113 114 115 116 117 118 119 120
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
        has_tx_timer = false;
    }

malosek's avatar
malosek committed
121 122
    rm_fd (handle);
    rm_fd (uplink_handle);
malosek's avatar
malosek committed
123
    rm_fd (rdata_notify_handle);
124
    rm_fd (pending_notify_handle);
125
    session = NULL;
malosek's avatar
malosek committed
126 127
}

128 129 130 131 132 133
void zmq::pgm_sender_t::terminate ()
{
    unplug ();
    delete this;
}

134
void zmq::pgm_sender_t::restart_output ()
malosek's avatar
malosek committed
135 136
{
    set_pollout (handle);
137
    out_event ();
malosek's avatar
malosek committed
138 139
}

140
bool zmq::pgm_sender_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
141 142
{
    zmq_assert (false);
143
    return true;
Martin Hurton's avatar
Martin Hurton committed
144 145
}

146 147 148 149 150
const char *zmq::pgm_sender_t::get_endpoint () const
{
    return "";
}

malosek's avatar
malosek committed
151 152
zmq::pgm_sender_t::~pgm_sender_t ()
{
153 154 155
    int rc = msg.close ();
    errno_assert (rc == 0);

malosek's avatar
malosek committed
156
    if (out_buffer) {
Martin Sustrik's avatar
Martin Sustrik committed
157
        free (out_buffer);
malosek's avatar
malosek committed
158
        out_buffer = NULL;
malosek's avatar
malosek committed
159 160 161 162 163
    }
}

void zmq::pgm_sender_t::in_event ()
{
164 165 166 167 168
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

169
    //  In-event on sender side means NAK or SPMR receiving from some peer.
malosek's avatar
malosek committed
170
    pgm_socket.process_upstream ();
171
    if (errno == ENOMEM || errno == EBUSY) {
172 173 174 175
        const long timeout = pgm_socket.get_rx_timeout ();
        add_timer (timeout, rx_timer_id);
        has_rx_timer = true;
    }
malosek's avatar
malosek committed
176 177 178 179
}

void zmq::pgm_sender_t::out_event ()
{
180
    //  POLLOUT event from send socket. If write buffer is empty,
malosek's avatar
malosek committed
181
    //  try to read new data from the encoder.
Martin Sustrik's avatar
Martin Sustrik committed
182
    if (write_size == 0) {
183
        //  First two bytes (sizeof uint16_t) are used to store message
Martin Sustrik's avatar
Martin Sustrik committed
184 185
        //  offset in following steps. Note that by passing our buffer to
        //  the get data function we prevent it from returning its own buffer.
Martin Sustrik's avatar
Martin Sustrik committed
186
        unsigned char *bf = out_buffer + sizeof (uint16_t);
Martin Sustrik's avatar
Martin Sustrik committed
187
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
188 189 190 191 192
        uint16_t offset = 0xffff;

        size_t bytes = encoder.encode (&bf, bfsz);
        while (bytes < bfsz) {
            if (!more_flag && offset == 0xffff)
193
                offset = static_cast<uint16_t> (bytes);
194 195 196 197 198 199 200 201
            int rc = session->pull_msg (&msg);
            if (rc == -1)
                break;
            more_flag = msg.flags () & msg_t::more;
            encoder.load_msg (&msg);
            bf = out_buffer + sizeof (uint16_t) + bytes;
            bytes += encoder.encode (&bf, bfsz - bytes);
        }
malosek's avatar
malosek committed
202 203

        //  If there are no data to write stop polling for output.
204
        if (bytes == 0) {
malosek's avatar
malosek committed
205
            reset_pollout (handle);
Martin Sustrik's avatar
Martin Sustrik committed
206
            return;
malosek's avatar
malosek committed
207 208
        }

209 210
        write_size = sizeof (uint16_t) + bytes;

Martin Sustrik's avatar
Martin Sustrik committed
211
        //  Put offset information in the buffer.
212
        put_uint16 (out_buffer, offset);
malosek's avatar
malosek committed
213 214
    }

215 216
    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
217
        set_pollout (handle);
218
        has_tx_timer = false;
219 220
    }

Martin Sustrik's avatar
Martin Sustrik committed
221 222
    //  Send the data.
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
malosek's avatar
malosek committed
223

Martin Sustrik's avatar
Martin Sustrik committed
224
    //  We can write either all data or 0 which means rate limit reached.
225
    if (nbytes == write_size)
Martin Sustrik's avatar
Martin Sustrik committed
226
        write_size = 0;
227
    else {
Martin Sustrik's avatar
Martin Sustrik committed
228
        zmq_assert (nbytes == 0);
229 230

        if (errno == ENOMEM) {
231
            // Stop polling handle and wait for tx timeout
232 233
            const long timeout = pgm_socket.get_tx_timeout ();
            add_timer (timeout, tx_timer_id);
234
            reset_pollout (handle);
235
            has_tx_timer = true;
236
        } else
237
            errno_assert (errno == EBUSY);
238 239 240 241 242
    }
}

void zmq::pgm_sender_t::timer_event (int token)
{
243 244 245
    //  Timer cancels on return by poller_base.
    if (token == rx_timer_id) {
        has_rx_timer = false;
246
        in_event ();
247
    } else if (token == tx_timer_id) {
248
        // Restart polling handle and retry sending
249
        has_tx_timer = false;
250
        set_pollout (handle);
251
        out_event ();
252
    } else
253
        zmq_assert (false);
malosek's avatar
malosek committed
254
}
Martin Sustrik's avatar
Martin Sustrik committed
255

malosek's avatar
malosek committed
256
#endif