pipe.cpp 11.8 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include <new>
21
#include <stddef.h>
22

Martin Sustrik's avatar
Martin Sustrik committed
23
#include "pipe.hpp"
24
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
    int hwms_ [2], bool delays_ [2])
{
    //   Creates two pipe objects. These objects are connected by two ypipes,
    //   each to pass messages in one direction.

    pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
    alloc_assert (upipe1);
    pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
    alloc_assert (upipe2);

    pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
        hwms_ [1], hwms_ [0], delays_ [0]);
    alloc_assert (pipes_ [0]);
    pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
        hwms_ [0], hwms_ [1], delays_ [1]);
    alloc_assert (pipes_ [1]);

    pipes_ [0]->set_peer (pipes_ [1]);
    pipes_ [1]->set_peer (pipes_ [0]);

    return 0;
}

zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
      int inhwm_, int outhwm_, bool delay_) :
Martin Sustrik's avatar
Martin Sustrik committed
52
    object_t (parent_),
53 54 55 56 57 58
    inpipe (inpipe_),
    outpipe (outpipe_),
    in_active (true),
    out_active (true),
    hwm (outhwm_),
    lwm (compute_lwm (inhwm_)),
Martin Hurton's avatar
Martin Hurton committed
59
    msgs_read (0),
60 61 62
    msgs_written (0),
    peers_msgs_read (0),
    peer (NULL),
63
    sink (NULL),
64
    state (active),
65
    delay (delay_)
66 67 68
{
}

69
zmq::pipe_t::~pipe_t ()
70 71
{
}
Martin Sustrik's avatar
Martin Sustrik committed
72

73
void zmq::pipe_t::set_peer (pipe_t *peer_)
Martin Sustrik's avatar
Martin Sustrik committed
74
{
75 76 77
    //  Peer can be set once only.
    zmq_assert (!peer);
    peer = peer_;
Martin Sustrik's avatar
Martin Sustrik committed
78 79
}

80
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
unknown's avatar
unknown committed
81
{
82
    // Sink can be set once only.
83
    zmq_assert (!sink);
84
    sink = sink_;
unknown's avatar
unknown committed
85 86
}

87
void zmq::pipe_t::set_identity (const blob_t &identity_)
88
{
89
    identity = identity_;
90 91
}

92
zmq::blob_t zmq::pipe_t::get_identity ()
93
{
94
    return identity;
95 96
}

97
bool zmq::pipe_t::check_read ()
98
{
99
    if (unlikely (!in_active || (state != active && state != pending)))
100 101
        return false;

102
    //  Check if there's an item in the pipe.
103 104
    if (!inpipe->check_read ()) {
        in_active = false;
105
        return false;
106
    }
107 108

    //  If the next item in the pipe is message delimiter,
109 110
    //  initiate termination process.
    if (inpipe->probe (is_delimiter)) {
111
        msg_t msg;
112
        bool ok = inpipe->read (&msg);
113
        zmq_assert (ok);
114
        delimit ();
115 116 117 118
        return false;
    }

    return true;
119 120
}

121
bool zmq::pipe_t::read (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
122
{
123
    if (unlikely (!in_active || (state != active && state != pending)))
124 125
        return false;

126 127
    if (!inpipe->read (msg_)) {
        in_active = false;
Martin Sustrik's avatar
Martin Sustrik committed
128
        return false;
129
    }
Martin Sustrik's avatar
Martin Sustrik committed
130 131

    //  If delimiter was read, start termination process of the pipe.
132
    if (msg_->is_delimiter ()) {
133
        delimit ();
Martin Sustrik's avatar
Martin Sustrik committed
134 135
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
136

137
    if (!(msg_->flags () & msg_t::more))
138 139
        msgs_read++;

Martin Hurton's avatar
Martin Hurton committed
140
    if (lwm > 0 && msgs_read % lwm == 0)
141
        send_activate_write (peer, msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
142 143

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
144 145
}

146
bool zmq::pipe_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
147
{
148
    if (unlikely (!out_active || state != active))
149
        return false;
150

151
    bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
Martin Sustrik's avatar
Martin Sustrik committed
152

153 154 155 156 157 158
    if (unlikely (full)) {
        out_active = false;
        return false;
    }

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
159 160
}

161
bool zmq::pipe_t::write (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
162
{
163
    if (unlikely (!check_write ()))
164
        return false;
165

166
    bool more = msg_->flags () & msg_t::more ? true : false;
167 168
    outpipe->write (*msg_, more);
    if (!more)
169
        msgs_written++;
170

171
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
172 173
}

174
void zmq::pipe_t::rollback ()
Martin Sustrik's avatar
Martin Sustrik committed
175
{
176 177
    //  Remove incomplete message from the outbound pipe.
    msg_t msg;
178
    if (outpipe) {
179 180 181 182 183
        while (outpipe->unwrite (&msg)) {
            zmq_assert (msg.flags () & msg_t::more);
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
184
    }
Martin Sustrik's avatar
Martin Sustrik committed
185 186
}

187
void zmq::pipe_t::flush ()
Martin Sustrik's avatar
Martin Sustrik committed
188
{
189 190 191 192
    //  The peer does not exist anymore at this point.
    if (state == terminating)
        return;

193
    if (outpipe && !outpipe->flush ())
194
        send_activate_read (peer);
Martin Sustrik's avatar
Martin Sustrik committed
195 196
}

197
void zmq::pipe_t::process_activate_read ()
unknown's avatar
unknown committed
198
{
199
    if (!in_active && (state == active || state == pending)) {
200 201 202
        in_active = true;
        sink->read_activated (this);
    }
unknown's avatar
unknown committed
203 204
}

205
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
Martin Sustrik's avatar
Martin Sustrik committed
206
{
207 208
    //  Remember the peers's message sequence number.
    peers_msgs_read = msgs_read_;
209

210
    if (!out_active && state == active) {
211 212
        out_active = true;
        sink->write_activated (this);
Martin Hurton's avatar
Martin Hurton committed
213
    }
Martin Sustrik's avatar
Martin Sustrik committed
214 215
}

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
void zmq::pipe_t::process_hiccup (void *pipe_)
{
    //  Destroy old outpipe. Note that the read end of the pipe was already
    //  migrated to this thread.
    zmq_assert (outpipe);
    outpipe->flush ();
    msg_t msg;
    while (outpipe->read (&msg)) {
       int rc = msg.close ();
       errno_assert (rc == 0);
    }
    delete outpipe;

    //  Plug in the new outpipe.
    zmq_assert (pipe_);
    outpipe = (upipe_t*) pipe_;
    out_active = true;

    //  If appropriate, notify the user about the hiccup.
    if (state == active)
        sink->hiccuped (this);
}

239
void zmq::pipe_t::process_pipe_term ()
Martin Sustrik's avatar
Martin Sustrik committed
240
{
241 242 243 244 245 246 247 248
    //  This is the simple case of peer-induced termination. If there are no
    //  more pending messages to read, or if the pipe was configured to drop
    //  pending messages, we can move directly to the terminating state.
    //  Otherwise we'll hang up in pending state till all the pending messages
    //  are sent.
    if (state == active) {
        if (!delay) {
            state = terminating;
249
            outpipe = NULL;
250 251
            send_pipe_term_ack (peer);
        }
Martin Hurton's avatar
Martin Hurton committed
252
        else
253
            state = pending;
Martin Hurton's avatar
Martin Hurton committed
254
        return;
255 256 257 258 259 260
    }

    //  Delimiter happened to arrive before the term command. Now we have the
    //  term command as well, so we can move straight to terminating state.
    if (state == delimited) {
        state = terminating;
261
        outpipe = NULL;
262 263 264 265 266 267 268 269 270
        send_pipe_term_ack (peer);
        return;
    }

    //  This is the case where both ends of the pipe are closed in parallel.
    //  We simply reply to the request by ack and continue waiting for our
    //  own ack.
    if (state == terminated) {
        state = double_terminated;
271
        outpipe = NULL;
272
        send_pipe_term_ack (peer);
273
        return;
274
    }
275 276 277

    //  pipe_term is invalid in other states.
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
278 279
}

280
void zmq::pipe_t::process_pipe_term_ack ()
281
{
282 283 284 285
    //  Notify the user that all the references to the pipe should be dropped.
    zmq_assert (sink);
    sink->terminated (this);

286 287 288 289
    //  In terminating and double_terminated states there's nothing to do.
    //  Simply deallocate the pipe. In terminated state we have to ack the
    //  peer before deallocating this side of the pipe. All the other states
    //  are invalid.
Martin Hurton's avatar
Martin Hurton committed
290
    if (state == terminated) {
291
        outpipe = NULL;
292
        send_pipe_term_ack (peer);
293
    }
294
    else
Martin Hurton's avatar
Martin Hurton committed
295
        zmq_assert (state == terminating || state == double_terminated);
296 297 298 299 300 301

    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
    //  pipe (which is an inbound pipe from its point of view).
    //  First, delete all the unread messages in the pipe. We have to do it by
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
    //  the ypipe itself.
302
    msg_t msg;
303 304 305
    while (inpipe->read (&msg)) {
       int rc = msg.close ();
       errno_assert (rc == 0);
Martin Hurton's avatar
Martin Hurton committed
306
    }
307
    delete inpipe;
308

309 310
    //  Deallocate the pipe object
    delete this;
Martin Sustrik's avatar
Martin Sustrik committed
311 312
}

313
void zmq::pipe_t::terminate (bool delay_)
Martin Sustrik's avatar
Martin Sustrik committed
314
{
315 316 317
    //  Overload the value specified at pipe creation.
    delay = delay_;

318 319
    //  If terminate was already called, we can ignore the duplicit invocation.
    if (state == terminated || state == double_terminated)
320
        return;
321 322 323

    //  If the pipe is in the final phase of async termination, it's going to
    //  closed anyway. No need to do anything special here.
324 325
    else 
    if (state == terminating)
326 327 328 329
        return;

    //  The simple sync termination case. Ask the peer to terminate and wait
    //  for the ack.
330 331
    else 
    if (state == active) {
332 333 334 335 336 337
        send_pipe_term (peer);
        state = terminated;
    }

    //  There are still pending messages available, but the user calls
    //  'terminate'. We can act as if all the pending messages were read.
338 339
    else 
    if (state == pending && !delay) {
Martin Hurton's avatar
Martin Hurton committed
340 341 342
        outpipe = NULL;
        send_pipe_term_ack (peer);
        state = terminating;
343 344 345
    }

    //  If there are pending messages still availabe, do nothing.
346 347
    else 
    if (state == pending) {
348 349 350 351
    }

    //  We've already got delimiter, but not term command yet. We can ignore
    //  the delimiter and ack synchronously terminate as if we were in
Martin Hurton's avatar
Martin Hurton committed
352
    //  active state.
353 354
    else 
    if (state == delimited) {
355
        send_pipe_term (peer);
Sergey KHripchenko's avatar
Sergey KHripchenko committed
356
        state = terminated;
357 358 359 360 361
    }

    //  There are no other states.
    else
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
362

363
    //  Stop outbound flow of messages.
364
    out_active = false;
Martin Hurton's avatar
Martin Hurton committed
365

366
    if (outpipe) {
367

368 369
        //  Drop any unfinished outbound messages.
        rollback ();
370

371 372 373 374 375 376
        //  Write the delimiter into the pipe. Note that watermarks are not
        //  checked; thus the delimiter can be written even when the pipe is full.
        msg_t msg;
        msg.init_delimiter ();
        outpipe->write (msg, false);
        flush ();
377
    }
Martin Sustrik's avatar
Martin Sustrik committed
378 379
}

380
bool zmq::pipe_t::is_delimiter (msg_t &msg_)
Martin Sustrik's avatar
Martin Sustrik committed
381
{
382
    return msg_.is_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
383
}
384

385
int zmq::pipe_t::compute_lwm (int hwm_)
386
{
387
    //  Compute the low water mark. Following point should be taken
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
    //  into consideration:
    //
    //  1. LWM has to be less than HWM.
    //  2. LWM cannot be set to very low value (such as zero) as after filling
    //     the queue it would start to refill only after all the messages are
    //     read from it and thus unnecessarily hold the progress back.
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
    //     result in lock-step filling of the queue - if a single message is
    //     read from a full queue, writer thread is resumed to write exactly one
    //     message to the queue and go back to sleep immediately. This would
    //     result in low performance.
    //
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
    //  possible to reduce the thread switching overhead to almost zero,
    //  say HWM-LWM should be max_wm_delta.
    //
    //  That done, we still we have to account for the cases where
    //  HWM < max_wm_delta thus driving LWM to negative numbers.
    //  Let's make LWM 1/2 of HWM in such cases.
407
    int result = (hwm_ > max_wm_delta * 2) ?
408 409
        hwm_ - max_wm_delta : (hwm_ + 1) / 2;

410
    return result;
411
}
412 413 414 415 416 417 418 419 420

void zmq::pipe_t::delimit ()
{
    if (state == active) {
        state = delimited;
        return;
    }

    if (state == pending) {
421
        outpipe = NULL;
422 423 424 425 426 427 428 429
        send_pipe_term_ack (peer);
        state = terminating;
        return;
    }

    //  Delimiter in any other state is invalid.
    zmq_assert (false);
}
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449

void zmq::pipe_t::hiccup ()
{
    //  If termination is already under way do nothing.
    if (state != active)
        return;

    //  We'll drop the pointer to the inpipe. From now on, the peer is
    //  responsible for deallocating it.
    inpipe = NULL;

    //  Create new inpipe.
    inpipe = new (std::nothrow) pipe_t::upipe_t ();
    alloc_assert (inpipe);
    in_active = true;

    //  Notify the peer about the hiccup.
    send_hiccup (peer, (void*) inpipe);
}