pgm_sender.cpp 6.94 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
malosek's avatar
malosek committed
31

malosek's avatar
malosek committed
32
#if defined ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33

Martin Sustrik's avatar
Martin Sustrik committed
34
#include <stdlib.h>
malosek's avatar
malosek committed
35 36 37

#include "io_thread.hpp"
#include "pgm_sender.hpp"
38
#include "session_base.hpp"
malosek's avatar
malosek committed
39 40
#include "err.hpp"
#include "wire.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41
#include "stdint.hpp"
malosek's avatar
malosek committed
42

43
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
44
      const options_t &options_) :
malosek's avatar
malosek committed
45
    io_object_t (parent_),
46 47
    has_tx_timer (false),
    has_rx_timer (false),
48
    session (NULL),
49
    encoder (0),
50
    more_flag (false),
malosek's avatar
malosek committed
51 52 53 54
    pgm_socket (false, options_),
    options (options_),
    out_buffer (NULL),
    out_buffer_size (0),
Martin Sustrik's avatar
Martin Sustrik committed
55
    write_size (0)
malosek's avatar
malosek committed
56
{
57 58
    int rc = msg.init ();
    errno_assert (rc == 0);
malosek's avatar
malosek committed
59 60
}

61
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
malosek's avatar
malosek committed
62
{
Martin Sustrik's avatar
Martin Sustrik committed
63 64 65 66 67 68
    int rc = pgm_socket.init (udp_encapsulation_, network_);
    if (rc != 0)
        return rc;

    out_buffer_size = pgm_socket.get_max_tsdu_size ();
    out_buffer = (unsigned char*) malloc (out_buffer_size);
69
    alloc_assert (out_buffer);
70 71

    return rc;
malosek's avatar
malosek committed
72 73
}

74
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
malosek's avatar
malosek committed
75
{
76
    //  Allocate 2 fds for PGM socket.
77 78 79 80
    fd_t downlink_socket_fd = retired_fd;
    fd_t uplink_socket_fd = retired_fd;
    fd_t rdata_notify_fd = retired_fd;
    fd_t pending_notify_fd = retired_fd;
malosek's avatar
malosek committed
81

82
    session = session_;
malosek's avatar
malosek committed
83

Martin Sustrik's avatar
Martin Sustrik committed
84 85
    //  Fill fds from PGM transport and add them to the poller.
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
86 87
        &rdata_notify_fd, &pending_notify_fd);

malosek's avatar
malosek committed
88 89
    handle = add_fd (downlink_socket_fd);
    uplink_handle = add_fd (uplink_socket_fd);
90
    rdata_notify_handle = add_fd (rdata_notify_fd);
91
    pending_notify_handle = add_fd (pending_notify_fd);
malosek's avatar
malosek committed
92

malosek's avatar
malosek committed
93
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
94
    //  want to stop processing NAKs.
malosek's avatar
malosek committed
95
    set_pollin (uplink_handle);
malosek's avatar
malosek committed
96
    set_pollin (rdata_notify_handle);
97
    set_pollin (pending_notify_handle);
malosek's avatar
malosek committed
98 99 100 101 102 103 104

    //  Set POLLOUT for downlink_socket_handle.
    set_pollout (handle);
}

void zmq::pgm_sender_t::unplug ()
{
105 106 107 108 109 110 111 112 113 114
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
        has_tx_timer = false;
    }

malosek's avatar
malosek committed
115 116
    rm_fd (handle);
    rm_fd (uplink_handle);
malosek's avatar
malosek committed
117
    rm_fd (rdata_notify_handle);
118
    rm_fd (pending_notify_handle);
119
    session = NULL;
malosek's avatar
malosek committed
120 121
}

122 123 124 125 126 127
void zmq::pgm_sender_t::terminate ()
{
    unplug ();
    delete this;
}

128
void zmq::pgm_sender_t::restart_output ()
malosek's avatar
malosek committed
129 130
{
    set_pollout (handle);
131
    out_event ();
malosek's avatar
malosek committed
132 133
}

134
void zmq::pgm_sender_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
135 136 137 138
{
    zmq_assert (false);
}

malosek's avatar
malosek committed
139 140
zmq::pgm_sender_t::~pgm_sender_t ()
{
141 142 143
    int rc = msg.close ();
    errno_assert (rc == 0);

malosek's avatar
malosek committed
144
    if (out_buffer) {
Martin Sustrik's avatar
Martin Sustrik committed
145
        free (out_buffer);
malosek's avatar
malosek committed
146
        out_buffer = NULL;
malosek's avatar
malosek committed
147 148 149 150 151
    }
}

void zmq::pgm_sender_t::in_event ()
{
152 153 154 155 156
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

157
    //  In-event on sender side means NAK or SPMR receiving from some peer.
malosek's avatar
malosek committed
158
    pgm_socket.process_upstream ();
159
    if (errno == ENOMEM || errno == EBUSY) {
160 161 162 163
        const long timeout = pgm_socket.get_rx_timeout ();
        add_timer (timeout, rx_timer_id);
        has_rx_timer = true;
    }
malosek's avatar
malosek committed
164 165 166 167
}

void zmq::pgm_sender_t::out_event ()
{
168
    //  POLLOUT event from send socket. If write buffer is empty,
malosek's avatar
malosek committed
169
    //  try to read new data from the encoder.
Martin Sustrik's avatar
Martin Sustrik committed
170
    if (write_size == 0) {
malosek's avatar
malosek committed
171

172
        //  First two bytes (sizeof uint16_t) are used to store message
Martin Sustrik's avatar
Martin Sustrik committed
173 174
        //  offset in following steps. Note that by passing our buffer to
        //  the get data function we prevent it from returning its own buffer.
Martin Sustrik's avatar
Martin Sustrik committed
175
        unsigned char *bf = out_buffer + sizeof (uint16_t);
Martin Sustrik's avatar
Martin Sustrik committed
176
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
177 178 179 180 181 182 183 184 185 186 187 188 189 190
        uint16_t offset = 0xffff;

        size_t bytes = encoder.encode (&bf, bfsz);
        while (bytes < bfsz) {
            if (!more_flag && offset == 0xffff)
                offset = static_cast <uint16_t> (bytes);
            int rc = session->pull_msg (&msg);
            if (rc == -1)
                break;
            more_flag = msg.flags () & msg_t::more;
            encoder.load_msg (&msg);
            bf = out_buffer + sizeof (uint16_t) + bytes;
            bytes += encoder.encode (&bf, bfsz - bytes);
        }
malosek's avatar
malosek committed
191 192

        //  If there are no data to write stop polling for output.
193
        if (bytes == 0) {
malosek's avatar
malosek committed
194
            reset_pollout (handle);
Martin Sustrik's avatar
Martin Sustrik committed
195
            return;
malosek's avatar
malosek committed
196 197
        }

198 199
        write_size = sizeof (uint16_t) + bytes;

Martin Sustrik's avatar
Martin Sustrik committed
200
        //  Put offset information in the buffer.
201
        put_uint16 (out_buffer, offset);
malosek's avatar
malosek committed
202 203
    }

204 205
    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
206
        set_pollout (handle);
207
        has_tx_timer = false;
208 209
    }

Martin Sustrik's avatar
Martin Sustrik committed
210 211
    //  Send the data.
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
malosek's avatar
malosek committed
212

Martin Sustrik's avatar
Martin Sustrik committed
213
    //  We can write either all data or 0 which means rate limit reached.
214
    if (nbytes == write_size)
Martin Sustrik's avatar
Martin Sustrik committed
215
        write_size = 0;
216
    else {
Martin Sustrik's avatar
Martin Sustrik committed
217
        zmq_assert (nbytes == 0);
218 219

        if (errno == ENOMEM) {
220
            // Stop polling handle and wait for tx timeout
221 222
            const long timeout = pgm_socket.get_tx_timeout ();
            add_timer (timeout, tx_timer_id);
223
            reset_pollout (handle);
224
            has_tx_timer = true;
225 226
        }
        else
227
            errno_assert (errno == EBUSY);
228 229 230 231 232
    }
}

void zmq::pgm_sender_t::timer_event (int token)
{
233 234 235
    //  Timer cancels on return by poller_base.
    if (token == rx_timer_id) {
        has_rx_timer = false;
236
        in_event ();
237 238 239
    }
    else
    if (token == tx_timer_id) {
240
        // Restart polling handle and retry sending
241
        has_tx_timer = false;
242
        set_pollout (handle);
243
        out_event ();
244 245
    }
    else
246
        zmq_assert (false);
malosek's avatar
malosek committed
247
}
Martin Sustrik's avatar
Martin Sustrik committed
248

malosek's avatar
malosek committed
249 250
#endif