pgm_receiver.cpp 7.75 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"

malosek's avatar
malosek committed
22
#if defined ZMQ_HAVE_OPENPGM
23

24 25
#include <new>

26 27 28 29
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif

30
#include "pgm_receiver.hpp"
31
#include "session_base.hpp"
32
#include "v1_decoder.hpp"
33 34
#include "stdint.hpp"
#include "wire.hpp"
35
#include "err.hpp"
36 37

zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, 
38
      const options_t &options_) :
39
    io_object_t (parent_),
40
    has_rx_timer (false),
41 42
    pgm_socket (true, options_),
    options (options_),
43
    session (NULL),
44 45
    active_tsi (NULL),
    insize (0)
46 47 48 49 50
{
}

zmq::pgm_receiver_t::~pgm_receiver_t ()
{
malosek's avatar
malosek committed
51 52
    //  Destructor should not be called before unplug.
    zmq_assert (peers.empty ());
53 54
}

55
int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
56
{
57
    return pgm_socket.init (udp_encapsulation_, network_);
58 59
}

60 61
void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
62
{
Martin Sustrik's avatar
Martin Sustrik committed
63
    //  Retrieve PGM fds and start polling.
64 65
    fd_t socket_fd = retired_fd;
    fd_t waiting_pipe_fd = retired_fd;
66 67 68 69 70 71
    pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
    socket_handle = add_fd (socket_fd);
    pipe_handle = add_fd (waiting_pipe_fd);
    set_pollin (pipe_handle);
    set_pollin (socket_handle);

72
    session = session_;
73 74 75

    //  If there are any subscriptions already queued in the session, drop them.
    drop_subscriptions ();
76 77 78 79
}

void zmq::pgm_receiver_t::unplug ()
{
malosek's avatar
malosek committed
80
    //  Delete decoders.
81
    for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
malosek's avatar
malosek committed
82 83 84 85
        if (it->second.decoder != NULL)
            delete it->second.decoder;
    }
    peers.clear ();
86
    active_tsi = NULL;
87

88 89 90 91 92
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

93 94
    rm_fd (socket_handle);
    rm_fd (pipe_handle);
Martin Sustrik's avatar
Martin Sustrik committed
95

96
    session = NULL;
97 98
}

99 100 101 102 103 104
void zmq::pgm_receiver_t::terminate ()
{
    unplug ();
    delete this;
}

105
void zmq::pgm_receiver_t::restart_output ()
106
{
107
    drop_subscriptions ();
108 109
}

110
void zmq::pgm_receiver_t::restart_input ()
Martin Hurton's avatar
Martin Hurton committed
111
{
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
    zmq_assert (session != NULL);
    zmq_assert (active_tsi != NULL);

    const peers_t::iterator it = peers.find (*active_tsi);
    zmq_assert (it != peers.end ());
    zmq_assert (it->second.joined);

    //  Push the pending message into the session.
    int rc = session->push_msg (it->second.decoder->msg ());
    errno_assert (rc == 0);

    if (insize > 0) {
        rc = process_input (it->second.decoder);
        if (rc == -1) {
            //  HWM reached; we will try later.
            if (errno == EAGAIN) {
                session->flush ();
                return;
            }
            //  Data error. Delete message decoder, mark the
            //  peer as not joined and drop remaining data.
            it->second.joined = false;
            delete it->second.decoder;
            it->second.decoder = NULL;
            insize = 0;
137
        }
138 139 140 141 142 143
    }

    //  Resume polling.
    set_pollin (pipe_handle);
    set_pollin (socket_handle);

144
    active_tsi = NULL;
145
    in_event ();
Martin Hurton's avatar
Martin Hurton committed
146 147
}

148 149
void zmq::pgm_receiver_t::in_event ()
{
Martin Sustrik's avatar
Martin Sustrik committed
150
    // Read data from the underlying pgm_socket.
malosek's avatar
malosek committed
151
    const pgm_tsi_t *tsi = NULL;
152

153 154 155 156 157
    if (has_rx_timer) {
        cancel_timer (rx_timer_id);
        has_rx_timer = false;
    }

158 159 160 161 162
    //  TODO: This loop can effectively block other engines in the same I/O
    //  thread in the case of high load.
    while (true) {

        //  Get new batch of data.
163 164 165
        //  Note the workaround made not to break strict-aliasing rules.
        void *tmp = NULL;
        ssize_t received = pgm_socket.receive (&tmp, &tsi);
166
        inpos = (unsigned char*) tmp;
167 168 169

        //  No data to process. This may happen if the packet received is
        //  neither ODATA nor ODATA.
170
        if (received == 0) {
171
            if (errno == ENOMEM || errno == EBUSY) {
172 173 174 175
                const long timeout = pgm_socket.get_rx_timeout ();
                add_timer (timeout, rx_timer_id);
                has_rx_timer = true;
            }
176
            break;
177
        }
178 179 180 181 182 183

        //  Find the peer based on its TSI.
        peers_t::iterator it = peers.find (*tsi);

        //  Data loss. Delete decoder and mark the peer as disjoint.
        if (received == -1) {
184 185 186 187 188 189
            if (it != peers.end ()) {
                it->second.joined = false;
                if (it->second.decoder != NULL) {
                    delete it->second.decoder;
                    it->second.decoder = NULL;
                }
190 191
            }
            break;
malosek's avatar
malosek committed
192
        }
193

194 195 196
        //  New peer. Add it to the list of know but unjoint peers.
        if (it == peers.end ()) {
            peer_info_t peer_info = {false, NULL};
197
            it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
198
        }
199

200 201
        insize = static_cast <size_t> (received);

202
        //  Read the offset of the fist message in the current packet.
203 204 205 206
        zmq_assert (insize >= sizeof (uint16_t));
        uint16_t offset = get_uint16 (inpos);
        inpos += sizeof (uint16_t);
        insize -= sizeof (uint16_t);
207

208 209
        //  Join the stream if needed.
        if (!it->second.joined) {
210

211 212 213 214
            //  There is no beginning of the message in current packet.
            //  Ignore the data.
            if (offset == 0xffff)
                continue;
215

216
            zmq_assert (offset <= insize);
217
            zmq_assert (it->second.decoder == NULL);
218

219
            //  We have to move data to the begining of the first message.
220 221
            inpos += offset;
            insize -= offset;
malosek's avatar
malosek committed
222

223 224
            //  Mark the stream as joined.
            it->second.joined = true;
malosek's avatar
malosek committed
225

226
            //  Create and connect decoder for the peer.
227 228
            it->second.decoder = new (std::nothrow)
                v1_decoder_t (0, options.maxmsgsize);
229
            alloc_assert (it->second.decoder);
230
        }
malosek's avatar
malosek committed
231

232 233 234 235 236 237 238 239 240 241
        int rc = process_input (it->second.decoder);
        if (rc == -1) {
            if (errno == EAGAIN) {
                active_tsi = tsi;

                //  Stop polling.
                reset_pollin (pipe_handle);
                reset_pollin (socket_handle);

                break;
242 243
            }

244 245 246 247
            it->second.joined = false;
            delete it->second.decoder;
            it->second.decoder = NULL;
            insize = 0;
248
        }
Martin Sustrik's avatar
Martin Sustrik committed
249
    }
250 251

    //  Flush any messages decoder may have produced.
252
    session->flush ();
253
}
malosek's avatar
malosek committed
254

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
    zmq_assert (session != NULL);

    while (insize > 0) {
        size_t n = 0;
        int rc = decoder->decode (inpos, insize, n);
        if (rc == -1)
            return -1;
        inpos += n;
        insize -= n;
        if (rc == 0)
            break;
        rc = session->push_msg (decoder->msg ());
        if (rc == -1) {
            errno_assert (errno == EAGAIN);
            return -1;
        }
    }
    return 0;
}


278 279 280 281
void zmq::pgm_receiver_t::timer_event (int token)
{
    zmq_assert (token == rx_timer_id);

282 283
    //  Timer cancels on return by poller_base.
    has_rx_timer = false;
284 285 286
    in_event ();
}

287 288 289
void zmq::pgm_receiver_t::drop_subscriptions ()
{
    msg_t msg;
290
    msg.init ();
Ian Barber's avatar
Ian Barber committed
291
    while (session->pull_msg (&msg))
292 293 294
        msg.close ();
}

295 296
#endif