pipe.cpp 14.8 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include <new>
31
#include <stddef.h>
32

Martin Sustrik's avatar
Martin Sustrik committed
33
#include "pipe.hpp"
34
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35

36 37 38
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"

39
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
Ian Barber's avatar
Ian Barber committed
40
    int hwms_ [2], bool conflate_ [2])
41 42 43 44
{
    //   Creates two pipe objects. These objects are connected by two ypipes,
    //   each to pass messages in one direction.

45 46
    typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
    typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
47 48 49 50 51 52

    pipe_t::upipe_t *upipe1;
    if(conflate_ [0])
        upipe1 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe1 = new (std::nothrow) upipe_normal_t ();
53
    alloc_assert (upipe1);
54 55 56 57 58 59

    pipe_t::upipe_t *upipe2;
    if(conflate_ [1])
        upipe2 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe2 = new (std::nothrow) upipe_normal_t ();
60 61 62
    alloc_assert (upipe2);

    pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
Ian Barber's avatar
Ian Barber committed
63
        hwms_ [1], hwms_ [0], conflate_ [0]);
64 65
    alloc_assert (pipes_ [0]);
    pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
Ian Barber's avatar
Ian Barber committed
66
        hwms_ [0], hwms_ [1], conflate_ [1]);
67 68 69 70 71 72 73 74 75
    alloc_assert (pipes_ [1]);

    pipes_ [0]->set_peer (pipes_ [1]);
    pipes_ [1]->set_peer (pipes_ [0]);

    return 0;
}

zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
Ian Barber's avatar
Ian Barber committed
76
      int inhwm_, int outhwm_, bool conflate_) :
Martin Sustrik's avatar
Martin Sustrik committed
77
    object_t (parent_),
78 79 80 81 82 83
    inpipe (inpipe_),
    outpipe (outpipe_),
    in_active (true),
    out_active (true),
    hwm (outhwm_),
    lwm (compute_lwm (inhwm_)),
84 85
    inhwmboost(0),
    outhwmboost(0),
Martin Hurton's avatar
Martin Hurton committed
86
    msgs_read (0),
87 88 89
    msgs_written (0),
    peers_msgs_read (0),
    peer (NULL),
90
    sink (NULL),
91
    state (active),
Ian Barber's avatar
Ian Barber committed
92
    delay (true),
93
    conflate (conflate_)
94 95 96
{
}

97
zmq::pipe_t::~pipe_t ()
98 99
{
}
Martin Sustrik's avatar
Martin Sustrik committed
100

101
void zmq::pipe_t::set_peer (pipe_t *peer_)
Martin Sustrik's avatar
Martin Sustrik committed
102
{
103 104 105
    //  Peer can be set once only.
    zmq_assert (!peer);
    peer = peer_;
Martin Sustrik's avatar
Martin Sustrik committed
106 107
}

108
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
unknown's avatar
unknown committed
109
{
110
    // Sink can be set once only.
111
    zmq_assert (!sink);
112
    sink = sink_;
unknown's avatar
unknown committed
113 114
}

115 116 117 118 119 120 121 122 123 124
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
{
    routing_id = routing_id_;
}

uint32_t zmq::pipe_t::get_routing_id ()
{
    return routing_id;
}

125
void zmq::pipe_t::set_identity (const blob_t &identity_)
126
{
127
    identity = identity_;
128 129
}

130
zmq::blob_t zmq::pipe_t::get_identity ()
131
{
132
    return identity;
133 134
}

135 136 137 138 139
zmq::blob_t zmq::pipe_t::get_credential () const
{
    return credential;
}

140
bool zmq::pipe_t::check_read ()
141
{
142 143 144
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
145 146
        return false;

147
    //  Check if there's an item in the pipe.
148 149
    if (!inpipe->check_read ()) {
        in_active = false;
150
        return false;
151
    }
152 153

    //  If the next item in the pipe is message delimiter,
154 155
    //  initiate termination process.
    if (inpipe->probe (is_delimiter)) {
156
        msg_t msg;
157
        bool ok = inpipe->read (&msg);
158
        zmq_assert (ok);
159
        process_delimiter ();
160 161 162 163
        return false;
    }

    return true;
164 165
}

166
bool zmq::pipe_t::read (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
167
{
168 169 170
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
171 172
        return false;

173
read_message:
174 175
    if (!inpipe->read (msg_)) {
        in_active = false;
Martin Sustrik's avatar
Martin Sustrik committed
176
        return false;
177
    }
Martin Sustrik's avatar
Martin Sustrik committed
178

179 180 181 182 183 184 185 186 187
    //  If this is a credential, save a copy and receive next message.
    if (unlikely (msg_->is_credential ())) {
        const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
        credential = blob_t (data, msg_->size ());
        const int rc = msg_->close ();
        zmq_assert (rc == 0);
        goto read_message;
    }

Martin Sustrik's avatar
Martin Sustrik committed
188
    //  If delimiter was read, start termination process of the pipe.
189
    if (msg_->is_delimiter ()) {
190
        process_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
191 192
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
193

194
    if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
195 196
        msgs_read++;

Martin Hurton's avatar
Martin Hurton committed
197
    if (lwm > 0 && msgs_read % lwm == 0)
198
        send_activate_write (peer, msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
199 200

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
201 202
}

203
bool zmq::pipe_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
204
{
205
    if (unlikely (!out_active || state != active))
206
        return false;
207

208
    bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
Martin Sustrik's avatar
Martin Sustrik committed
209

210 211 212 213 214 215
    if (unlikely (full)) {
        out_active = false;
        return false;
    }

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
216 217
}

218
bool zmq::pipe_t::write (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
219
{
220
    if (unlikely (!check_write ()))
221
        return false;
222

223
    bool more = msg_->flags () & msg_t::more ? true : false;
224
    const bool is_identity = msg_->is_identity ();
225
    outpipe->write (*msg_, more);
226
    if (!more && !is_identity)
227
        msgs_written++;
228

229
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
230 231
}

232
void zmq::pipe_t::rollback ()
Martin Sustrik's avatar
Martin Sustrik committed
233
{
234 235
    //  Remove incomplete message from the outbound pipe.
    msg_t msg;
236
    if (outpipe) {
237 238 239 240 241
        while (outpipe->unwrite (&msg)) {
            zmq_assert (msg.flags () & msg_t::more);
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
242
    }
Martin Sustrik's avatar
Martin Sustrik committed
243 244
}

245
void zmq::pipe_t::flush ()
Martin Sustrik's avatar
Martin Sustrik committed
246
{
247
    //  The peer does not exist anymore at this point.
248
    if (state == term_ack_sent)
249 250
        return;

251
    if (outpipe && !outpipe->flush ())
252
        send_activate_read (peer);
Martin Sustrik's avatar
Martin Sustrik committed
253 254
}

255
void zmq::pipe_t::process_activate_read ()
unknown's avatar
unknown committed
256
{
257
    if (!in_active && (state == active || state == waiting_for_delimiter)) {
258 259 260
        in_active = true;
        sink->read_activated (this);
    }
unknown's avatar
unknown committed
261 262
}

263
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
Martin Sustrik's avatar
Martin Sustrik committed
264
{
265 266
    //  Remember the peers's message sequence number.
    peers_msgs_read = msgs_read_;
267

268
    if (!out_active && state == active) {
269 270
        out_active = true;
        sink->write_activated (this);
Martin Hurton's avatar
Martin Hurton committed
271
    }
Martin Sustrik's avatar
Martin Sustrik committed
272 273
}

274 275 276 277 278 279 280 281
void zmq::pipe_t::process_hiccup (void *pipe_)
{
    //  Destroy old outpipe. Note that the read end of the pipe was already
    //  migrated to this thread.
    zmq_assert (outpipe);
    outpipe->flush ();
    msg_t msg;
    while (outpipe->read (&msg)) {
282 283
       if (!(msg.flags () & msg_t::more))
            msgs_written--;
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
       int rc = msg.close ();
       errno_assert (rc == 0);
    }
    delete outpipe;

    //  Plug in the new outpipe.
    zmq_assert (pipe_);
    outpipe = (upipe_t*) pipe_;
    out_active = true;

    //  If appropriate, notify the user about the hiccup.
    if (state == active)
        sink->hiccuped (this);
}

299
void zmq::pipe_t::process_pipe_term ()
Martin Sustrik's avatar
Martin Sustrik committed
300
{
Martin Hurton's avatar
Martin Hurton committed
301 302 303 304
    zmq_assert (state == active
            ||  state == delimiter_received
            ||  state == term_req_sent1);

305 306
    //  This is the simple case of peer-induced termination. If there are no
    //  more pending messages to read, or if the pipe was configured to drop
307 308 309
    //  pending messages, we can move directly to the term_ack_sent state.
    //  Otherwise we'll hang up in waiting_for_delimiter state till all
    //  pending messages are read.
310
    if (state == active) {
Martin Hurton's avatar
Martin Hurton committed
311 312 313
        if (delay)
            state = waiting_for_delimiter;
        else {
314
            state = term_ack_sent;
315
            outpipe = NULL;
316 317 318 319 320
            send_pipe_term_ack (peer);
        }
    }

    //  Delimiter happened to arrive before the term command. Now we have the
321
    //  term command as well, so we can move straight to term_ack_sent state.
Martin Hurton's avatar
Martin Hurton committed
322
    else
323 324
    if (state == delimiter_received) {
        state = term_ack_sent;
325
        outpipe = NULL;
326 327 328 329 330 331
        send_pipe_term_ack (peer);
    }

    //  This is the case where both ends of the pipe are closed in parallel.
    //  We simply reply to the request by ack and continue waiting for our
    //  own ack.
Martin Hurton's avatar
Martin Hurton committed
332
    else
333 334
    if (state == term_req_sent1) {
        state = term_req_sent2;
335
        outpipe = NULL;
336 337
        send_pipe_term_ack (peer);
    }
Martin Sustrik's avatar
Martin Sustrik committed
338 339
}

340
void zmq::pipe_t::process_pipe_term_ack ()
341
{
342 343
    //  Notify the user that all the references to the pipe should be dropped.
    zmq_assert (sink);
344
    sink->pipe_terminated (this);
345

346 347 348 349 350
    //  In term_ack_sent and term_req_sent2 states there's nothing to do.
    //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
    //  the peer before deallocating this side of the pipe.
    //  All the other states are invalid.
    if (state == term_req_sent1) {
351
        outpipe = NULL;
352
        send_pipe_term_ack (peer);
353
    }
354
    else
355
        zmq_assert (state == term_ack_sent || state == term_req_sent2);
356 357 358 359 360 361

    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
    //  pipe (which is an inbound pipe from its point of view).
    //  First, delete all the unread messages in the pipe. We have to do it by
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
    //  the ypipe itself.
362

danielkr's avatar
danielkr committed
363
    if (!conflate) {
364 365 366 367 368
        msg_t msg;
        while (inpipe->read (&msg)) {
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
Martin Hurton's avatar
Martin Hurton committed
369
    }
370

371
    delete inpipe;
372

373 374
    //  Deallocate the pipe object
    delete this;
Martin Sustrik's avatar
Martin Sustrik committed
375 376
}

Ian Barber's avatar
Ian Barber committed
377 378 379 380 381
void zmq::pipe_t::set_nodelay ()
{
    this->delay = false;
}

382
void zmq::pipe_t::terminate (bool delay_)
Martin Sustrik's avatar
Martin Sustrik committed
383
{
384 385 386
    //  Overload the value specified at pipe creation.
    delay = delay_;

387
    //  If terminate was already called, we can ignore the duplicit invocation.
388
    if (state == term_req_sent1 || state == term_req_sent2)
389
        return;
390 391 392

    //  If the pipe is in the final phase of async termination, it's going to
    //  closed anyway. No need to do anything special here.
393 394
    else
    if (state == term_ack_sent)
395 396 397 398
        return;

    //  The simple sync termination case. Ask the peer to terminate and wait
    //  for the ack.
399
    else
400
    if (state == active) {
401
        send_pipe_term (peer);
402
        state = term_req_sent1;
403 404 405 406
    }

    //  There are still pending messages available, but the user calls
    //  'terminate'. We can act as if all the pending messages were read.
407
    else
Martin Hurton's avatar
Martin Hurton committed
408
    if (state == waiting_for_delimiter && !delay) {
Martin Hurton's avatar
Martin Hurton committed
409 410
        outpipe = NULL;
        send_pipe_term_ack (peer);
411
        state = term_ack_sent;
412 413
    }

Sergey M․'s avatar
Sergey M․ committed
414
    //  If there are pending messages still available, do nothing.
415 416
    else
    if (state == waiting_for_delimiter) {
417 418 419 420
    }

    //  We've already got delimiter, but not term command yet. We can ignore
    //  the delimiter and ack synchronously terminate as if we were in
Martin Hurton's avatar
Martin Hurton committed
421
    //  active state.
422 423
    else
    if (state == delimiter_received) {
424
        send_pipe_term (peer);
425
        state = term_req_sent1;
426 427 428 429 430
    }

    //  There are no other states.
    else
        zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
431

432
    //  Stop outbound flow of messages.
433
    out_active = false;
Martin Hurton's avatar
Martin Hurton committed
434

435
    if (outpipe) {
436

437 438
        //  Drop any unfinished outbound messages.
        rollback ();
439

440 441 442 443 444 445
        //  Write the delimiter into the pipe. Note that watermarks are not
        //  checked; thus the delimiter can be written even when the pipe is full.
        msg_t msg;
        msg.init_delimiter ();
        outpipe->write (msg, false);
        flush ();
446
    }
Martin Sustrik's avatar
Martin Sustrik committed
447 448
}

449
bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
Martin Sustrik's avatar
Martin Sustrik committed
450
{
451
    return msg_.is_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
452
}
453

454
int zmq::pipe_t::compute_lwm (int hwm_)
455
{
456
    //  Compute the low water mark. Following point should be taken
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
    //  into consideration:
    //
    //  1. LWM has to be less than HWM.
    //  2. LWM cannot be set to very low value (such as zero) as after filling
    //     the queue it would start to refill only after all the messages are
    //     read from it and thus unnecessarily hold the progress back.
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
    //     result in lock-step filling of the queue - if a single message is
    //     read from a full queue, writer thread is resumed to write exactly one
    //     message to the queue and go back to sleep immediately. This would
    //     result in low performance.
    //
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
    //  possible to reduce the thread switching overhead to almost zero,
    //  say HWM-LWM should be max_wm_delta.
    //
    //  That done, we still we have to account for the cases where
    //  HWM < max_wm_delta thus driving LWM to negative numbers.
    //  Let's make LWM 1/2 of HWM in such cases.
476
    int result = (hwm_ > max_wm_delta * 2) ?
477 478
        hwm_ - max_wm_delta : (hwm_ + 1) / 2;

479
    return result;
480
}
481

482
void zmq::pipe_t::process_delimiter ()
483
{
484 485
    zmq_assert (state == active
            ||  state == waiting_for_delimiter);
486

487 488 489
    if (state == active)
        state = delimiter_received;
    else {
490
        outpipe = NULL;
491
        send_pipe_term_ack (peer);
492
        state = term_ack_sent;
493 494
    }
}
495 496 497 498 499 500 501 502 503 504 505 506

void zmq::pipe_t::hiccup ()
{
    //  If termination is already under way do nothing.
    if (state != active)
        return;

    //  We'll drop the pointer to the inpipe. From now on, the peer is
    //  responsible for deallocating it.
    inpipe = NULL;

    //  Create new inpipe.
507 508
    if (conflate)
        inpipe = new (std::nothrow)
Eric Cornelius's avatar
Eric Cornelius committed
509
            ypipe_conflate_t <msg_t> ();
510 511
    else
        inpipe = new (std::nothrow)
Eric Cornelius's avatar
Eric Cornelius committed
512
            ypipe_t <msg_t, message_pipe_granularity> ();
513

514 515 516 517 518 519 520
    alloc_assert (inpipe);
    in_active = true;

    //  Notify the peer about the hiccup.
    send_hiccup (peer, (void*) inpipe);
}

521 522
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
523 524 525 526 527 528 529 530 531 532 533 534
    int in = inhwm_ + inhwmboost;
    int out = outhwm_ + outhwmboost;

    // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
    if (inhwm_ <= 0 || inhwmboost <= 0)
		in = 0;
        
    if (outhwm_ <= 0 || outhwmboost <= 0)
		out = 0;

	lwm = compute_lwm(in);
	hwm = out;
535 536 537 538 539 540
}

void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
{
    inhwmboost = inhwmboost_;
    outhwmboost = outhwmboost_;
541
}
542

Martin Hurton's avatar
Martin Hurton committed
543
bool zmq::pipe_t::check_hwm () const
544 545 546 547
{
    bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm - 1);
    return( !full );
}