pgm_sender.cpp 6.99 KB
Newer Older
malosek's avatar
malosek committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
malosek's avatar
malosek committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
malosek's avatar
malosek committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
malosek's avatar
malosek committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
malosek's avatar
malosek committed
25

26
    You should have received a copy of the GNU Lesser General Public License
malosek's avatar
malosek committed
27 28 29 30 31
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"

malosek's avatar
malosek committed
32
#if defined ZMQ_HAVE_OPENPGM
malosek's avatar
malosek committed
33

34 35 36 37
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif

Martin Sustrik's avatar
Martin Sustrik committed
38
#include <stdlib.h>
malosek's avatar
malosek committed
39 40 41

#include "io_thread.hpp"
#include "pgm_sender.hpp"
42
#include "session_base.hpp"
malosek's avatar
malosek committed
43 44
#include "err.hpp"
#include "wire.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45
#include "stdint.hpp"
malosek's avatar
malosek committed
46

47
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
48
      const options_t &options_) :
malosek's avatar
malosek committed
49
    io_object_t (parent_),
50 51
    has_tx_timer (false),
    has_rx_timer (false),
52
    session (NULL),
53
    encoder (0),
54
    more_flag (false),
malosek's avatar
malosek committed
55 56 57 58
    pgm_socket (false, options_),
    options (options_),
    out_buffer (NULL),
    out_buffer_size (0),
Martin Sustrik's avatar
Martin Sustrik committed
59
    write_size (0)
malosek's avatar
malosek committed
60
{
61 62
    int rc = msg.init ();
    errno_assert (rc == 0);
malosek's avatar
malosek committed
63 64
}

65
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
malosek's avatar
malosek committed
66
{
Martin Sustrik's avatar
Martin Sustrik committed
67 68 69 70 71 72
    int rc = pgm_socket.init (udp_encapsulation_, network_);
    if (rc != 0)
        return rc;

    out_buffer_size = pgm_socket.get_max_tsdu_size ();
    out_buffer = (unsigned char*) malloc (out_buffer_size);
73
    alloc_assert (out_buffer);
74 75

    return rc;
malosek's avatar
malosek committed
76 77
}

78
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
malosek's avatar
malosek committed
79
{
80
    //  Allocate 2 fds for PGM socket.
81 82 83 84
    fd_t downlink_socket_fd = retired_fd;
    fd_t uplink_socket_fd = retired_fd;
    fd_t rdata_notify_fd = retired_fd;
    fd_t pending_notify_fd = retired_fd;
malosek's avatar
malosek committed
85

86
    session = session_;
malosek's avatar
malosek committed
87

Martin Sustrik's avatar
Martin Sustrik committed
88 89
    //  Fill fds from PGM transport and add them to the poller.
    pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
90 91
        &rdata_notify_fd, &pending_notify_fd);

malosek's avatar
malosek committed
92 93
    handle = add_fd (downlink_socket_fd);
    uplink_handle = add_fd (uplink_socket_fd);
94
    rdata_notify_handle = add_fd (rdata_notify_fd);
95
    pending_notify_handle = add_fd (pending_notify_fd);
malosek's avatar
malosek committed
96

malosek's avatar
malosek committed
97
    //  Set POLLIN. We wont never want to stop polling for uplink = we never
98
    //  want to stop processing NAKs.
malosek's avatar
malosek committed
99
    set_pollin (uplink_handle);
malosek's avatar
malosek committed
100
    set_pollin (rdata_notify_handle);
101
    set_pollin (pending_notify_handle);
malosek's avatar
malosek committed
102 103 104 105 106 107 108

    //  Set POLLOUT for downlink_socket_handle.
    set_pollout (handle);
}

void zmq::pgm_sender_t::unplug ()
{
109 110 111 112 113 114 115 116 117 118
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
        has_tx_timer = false;
    }

malosek's avatar
malosek committed
119 120
    rm_fd (handle);
    rm_fd (uplink_handle);
malosek's avatar
malosek committed
121
    rm_fd (rdata_notify_handle);
122
    rm_fd (pending_notify_handle);
123
    session = NULL;
malosek's avatar
malosek committed
124 125
}

126 127 128 129 130 131
void zmq::pgm_sender_t::terminate ()
{
    unplug ();
    delete this;
}

132
void zmq::pgm_sender_t::restart_output ()
malosek's avatar
malosek committed
133 134
{
    set_pollout (handle);
135
    out_event ();
malosek's avatar
malosek committed
136 137
}

138
void zmq::pgm_sender_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
139 140 141 142
{
    zmq_assert (false);
}

malosek's avatar
malosek committed
143 144
zmq::pgm_sender_t::~pgm_sender_t ()
{
145 146 147
    int rc = msg.close ();
    errno_assert (rc == 0);

malosek's avatar
malosek committed
148
    if (out_buffer) {
Martin Sustrik's avatar
Martin Sustrik committed
149
        free (out_buffer);
malosek's avatar
malosek committed
150
        out_buffer = NULL;
malosek's avatar
malosek committed
151 152 153 154 155
    }
}

void zmq::pgm_sender_t::in_event ()
{
156 157 158 159 160
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

161
    //  In-event on sender side means NAK or SPMR receiving from some peer.
malosek's avatar
malosek committed
162
    pgm_socket.process_upstream ();
163
    if (errno == ENOMEM || errno == EBUSY) {
164 165 166 167
        const long timeout = pgm_socket.get_rx_timeout ();
        add_timer (timeout, rx_timer_id);
        has_rx_timer = true;
    }
malosek's avatar
malosek committed
168 169 170 171
}

void zmq::pgm_sender_t::out_event ()
{
172
    //  POLLOUT event from send socket. If write buffer is empty,
malosek's avatar
malosek committed
173
    //  try to read new data from the encoder.
Martin Sustrik's avatar
Martin Sustrik committed
174
    if (write_size == 0) {
malosek's avatar
malosek committed
175

176
        //  First two bytes (sizeof uint16_t) are used to store message
Martin Sustrik's avatar
Martin Sustrik committed
177 178
        //  offset in following steps. Note that by passing our buffer to
        //  the get data function we prevent it from returning its own buffer.
Martin Sustrik's avatar
Martin Sustrik committed
179
        unsigned char *bf = out_buffer + sizeof (uint16_t);
Martin Sustrik's avatar
Martin Sustrik committed
180
        size_t bfsz = out_buffer_size - sizeof (uint16_t);
181 182 183 184 185 186 187 188 189 190 191 192 193 194
        uint16_t offset = 0xffff;

        size_t bytes = encoder.encode (&bf, bfsz);
        while (bytes < bfsz) {
            if (!more_flag && offset == 0xffff)
                offset = static_cast <uint16_t> (bytes);
            int rc = session->pull_msg (&msg);
            if (rc == -1)
                break;
            more_flag = msg.flags () & msg_t::more;
            encoder.load_msg (&msg);
            bf = out_buffer + sizeof (uint16_t) + bytes;
            bytes += encoder.encode (&bf, bfsz - bytes);
        }
malosek's avatar
malosek committed
195 196

        //  If there are no data to write stop polling for output.
197
        if (bytes == 0) {
malosek's avatar
malosek committed
198
            reset_pollout (handle);
Martin Sustrik's avatar
Martin Sustrik committed
199
            return;
malosek's avatar
malosek committed
200 201
        }

202 203
        write_size = sizeof (uint16_t) + bytes;

Martin Sustrik's avatar
Martin Sustrik committed
204
        //  Put offset information in the buffer.
205
        put_uint16 (out_buffer, offset);
malosek's avatar
malosek committed
206 207
    }

208 209
    if (has_tx_timer) {
        cancel_timer (tx_timer_id);
210
        set_pollout (handle);
211
        has_tx_timer = false;
212 213
    }

Martin Sustrik's avatar
Martin Sustrik committed
214 215
    //  Send the data.
    size_t nbytes = pgm_socket.send (out_buffer, write_size);
malosek's avatar
malosek committed
216

Martin Sustrik's avatar
Martin Sustrik committed
217
    //  We can write either all data or 0 which means rate limit reached.
218
    if (nbytes == write_size)
Martin Sustrik's avatar
Martin Sustrik committed
219
        write_size = 0;
220
    else {
Martin Sustrik's avatar
Martin Sustrik committed
221
        zmq_assert (nbytes == 0);
222 223

        if (errno == ENOMEM) {
224
            // Stop polling handle and wait for tx timeout
225 226
            const long timeout = pgm_socket.get_tx_timeout ();
            add_timer (timeout, tx_timer_id);
227
            reset_pollout (handle);
228
            has_tx_timer = true;
229 230
        }
        else
231
            errno_assert (errno == EBUSY);
232 233 234 235 236
    }
}

void zmq::pgm_sender_t::timer_event (int token)
{
237 238 239
    //  Timer cancels on return by poller_base.
    if (token == rx_timer_id) {
        has_rx_timer = false;
240
        in_event ();
241 242 243
    }
    else
    if (token == tx_timer_id) {
244
        // Restart polling handle and retry sending
245
        has_tx_timer = false;
246
        set_pollout (handle);
247
        out_event ();
248 249
    }
    else
250
        zmq_assert (false);
malosek's avatar
malosek committed
251
}
Martin Sustrik's avatar
Martin Sustrik committed
252

malosek's avatar
malosek committed
253 254
#endif