pipe.cpp 14.9 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <stddef.h>
33

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37

38 39 40
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"

41
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
Ian Barber's avatar
Ian Barber committed
42
    int hwms_ [2], bool conflate_ [2])
43 44 45 46
{
    //   Creates two pipe objects. These objects are connected by two ypipes,
    //   each to pass messages in one direction.

47 48
    typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
    typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
49 50 51 52 53 54

    pipe_t::upipe_t *upipe1;
    if(conflate_ [0])
        upipe1 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe1 = new (std::nothrow) upipe_normal_t ();
55
    alloc_assert (upipe1);
56 57 58 59 60 61

    pipe_t::upipe_t *upipe2;
    if(conflate_ [1])
        upipe2 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe2 = new (std::nothrow) upipe_normal_t ();
62 63 64
    alloc_assert (upipe2);

    pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
Ian Barber's avatar
Ian Barber committed
65
        hwms_ [1], hwms_ [0], conflate_ [0]);
66 67
    alloc_assert (pipes_ [0]);
    pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
Ian Barber's avatar
Ian Barber committed
68
        hwms_ [0], hwms_ [1], conflate_ [1]);
69 70 71 72 73 74 75 76 77
    alloc_assert (pipes_ [1]);

    pipes_ [0]->set_peer (pipes_ [1]);
    pipes_ [1]->set_peer (pipes_ [0]);

    return 0;
}

zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
Ian Barber's avatar
Ian Barber committed
78
      int inhwm_, int outhwm_, bool conflate_) :
Martin Sustrik's avatar
Martin Sustrik committed
79
    object_t (parent_),
80 81 82 83 84 85
    inpipe (inpipe_),
    outpipe (outpipe_),
    in_active (true),
    out_active (true),
    hwm (outhwm_),
    lwm (compute_lwm (inhwm_)),
86 87
    inhwmboost(-1),
    outhwmboost(-1),
Martin Hurton's avatar
Martin Hurton committed
88
    msgs_read (0),
89 90 91
    msgs_written (0),
    peers_msgs_read (0),
    peer (NULL),
92
    sink (NULL),
93
    state (active),
Ian Barber's avatar
Ian Barber committed
94
    delay (true),
95
    routing_id(0),
96
    conflate (conflate_)
97 98 99
{
}

100
zmq::pipe_t::~pipe_t ()
101 102
{
}
Martin Sustrik's avatar
Martin Sustrik committed
103

104
void zmq::pipe_t::set_peer (pipe_t *peer_)
Martin Sustrik's avatar
Martin Sustrik committed
105
{
106 107 108
    //  Peer can be set once only.
    zmq_assert (!peer);
    peer = peer_;
Martin Sustrik's avatar
Martin Sustrik committed
109 110
}

111
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
unknown's avatar
unknown committed
112
{
113
    // Sink can be set once only.
114
    zmq_assert (!sink);
115
    sink = sink_;
unknown's avatar
unknown committed
116 117
}

118 119 120 121 122 123 124 125 126 127
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
{
    routing_id = routing_id_;
}

uint32_t zmq::pipe_t::get_routing_id ()
{
    return routing_id;
}

128
void zmq::pipe_t::set_identity (const blob_t &identity_)
129
{
130
    identity = identity_;
131 132
}

133
zmq::blob_t zmq::pipe_t::get_identity ()
134
{
135
    return identity;
136 137
}

138 139 140 141 142
zmq::blob_t zmq::pipe_t::get_credential () const
{
    return credential;
}

143
bool zmq::pipe_t::check_read ()
144
{
145 146 147
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
148 149
        return false;

150
    //  Check if there's an item in the pipe.
151 152
    if (!inpipe->check_read ()) {
        in_active = false;
153
        return false;
154
    }
155 156

    //  If the next item in the pipe is message delimiter,
157 158
    //  initiate termination process.
    if (inpipe->probe (is_delimiter)) {
159
        msg_t msg;
160
        bool ok = inpipe->read (&msg);
161
        zmq_assert (ok);
162
        process_delimiter ();
163 164 165 166
        return false;
    }

    return true;
167 168
}

169
bool zmq::pipe_t::read (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
170
{
171 172 173
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
174 175
        return false;

176
read_message:
177 178
    if (!inpipe->read (msg_)) {
        in_active = false;
Martin Sustrik's avatar
Martin Sustrik committed
179
        return false;
180
    }
Martin Sustrik's avatar
Martin Sustrik committed
181

182 183 184 185 186 187 188 189 190
    //  If this is a credential, save a copy and receive next message.
    if (unlikely (msg_->is_credential ())) {
        const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
        credential = blob_t (data, msg_->size ());
        const int rc = msg_->close ();
        zmq_assert (rc == 0);
        goto read_message;
    }

Martin Sustrik's avatar
Martin Sustrik committed
191
    //  If delimiter was read, start termination process of the pipe.
192
    if (msg_->is_delimiter ()) {
193
        process_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
194 195
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
196

197
    if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
198 199
        msgs_read++;

Martin Hurton's avatar
Martin Hurton committed
200
    if (lwm > 0 && msgs_read % lwm == 0)
201
        send_activate_write (peer, msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
202 203

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
204 205
}

206
bool zmq::pipe_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
207
{
208
    if (unlikely (!out_active || state != active))
209
        return false;
210

211
    bool full = !check_hwm();
Martin Sustrik's avatar
Martin Sustrik committed
212

213 214 215 216 217 218
    if (unlikely (full)) {
        out_active = false;
        return false;
    }

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
219 220
}

221
bool zmq::pipe_t::write (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
222
{
223
    if (unlikely (!check_write ()))
224
        return false;
225

226
    bool more = msg_->flags () & msg_t::more ? true : false;
227
    const bool is_identity = msg_->is_identity ();
228
    outpipe->write (*msg_, more);
229
    if (!more && !is_identity)
230
        msgs_written++;
231

232
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
}

235
void zmq::pipe_t::rollback ()
Martin Sustrik's avatar
Martin Sustrik committed
236
{
237 238
    //  Remove incomplete message from the outbound pipe.
    msg_t msg;
239
    if (outpipe) {
240 241 242 243 244
        while (outpipe->unwrite (&msg)) {
            zmq_assert (msg.flags () & msg_t::more);
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
245
    }
Martin Sustrik's avatar
Martin Sustrik committed
246 247
}

248
void zmq::pipe_t::flush ()
Martin Sustrik's avatar
Martin Sustrik committed
249
{
250
    //  The peer does not exist anymore at this point.
251
    if (state == term_ack_sent)
252 253
        return;

254
    if (outpipe && !outpipe->flush ())
255
        send_activate_read (peer);
Martin Sustrik's avatar
Martin Sustrik committed
256 257
}

258
void zmq::pipe_t::process_activate_read ()
unknown's avatar
unknown committed
259
{
260
    if (!in_active && (state == active || state == waiting_for_delimiter)) {
261 262 263
        in_active = true;
        sink->read_activated (this);
    }
unknown's avatar
unknown committed
264 265
}

266
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
Martin Sustrik's avatar
Martin Sustrik committed
267
{
268
    //  Remember the peer's message sequence number.
269
    peers_msgs_read = msgs_read_;
270

271
    if (!out_active && state == active) {
272 273
        out_active = true;
        sink->write_activated (this);
Martin Hurton's avatar
Martin Hurton committed
274
    }
Martin Sustrik's avatar
Martin Sustrik committed
275 276
}

277 278 279 280 281 282 283 284
void zmq::pipe_t::process_hiccup (void *pipe_)
{
    //  Destroy old outpipe. Note that the read end of the pipe was already
    //  migrated to this thread.
    zmq_assert (outpipe);
    outpipe->flush ();
    msg_t msg;
    while (outpipe->read (&msg)) {
285 286
       if (!(msg.flags () & msg_t::more))
            msgs_written--;
287 288 289
       int rc = msg.close ();
       errno_assert (rc == 0);
    }
290
    LIBZMQ_DELETE(outpipe);
291 292 293 294 295 296 297 298 299 300 301

    //  Plug in the new outpipe.
    zmq_assert (pipe_);
    outpipe = (upipe_t*) pipe_;
    out_active = true;

    //  If appropriate, notify the user about the hiccup.
    if (state == active)
        sink->hiccuped (this);
}

302
void zmq::pipe_t::process_pipe_term ()
Martin Sustrik's avatar
Martin Sustrik committed
303
{
Martin Hurton's avatar
Martin Hurton committed
304 305 306 307
    zmq_assert (state == active
            ||  state == delimiter_received
            ||  state == term_req_sent1);

308 309
    //  This is the simple case of peer-induced termination. If there are no
    //  more pending messages to read, or if the pipe was configured to drop
310 311 312
    //  pending messages, we can move directly to the term_ack_sent state.
    //  Otherwise we'll hang up in waiting_for_delimiter state till all
    //  pending messages are read.
313
    if (state == active) {
Martin Hurton's avatar
Martin Hurton committed
314 315 316
        if (delay)
            state = waiting_for_delimiter;
        else {
317
            state = term_ack_sent;
318
            outpipe = NULL;
319 320 321 322 323
            send_pipe_term_ack (peer);
        }
    }

    //  Delimiter happened to arrive before the term command. Now we have the
324
    //  term command as well, so we can move straight to term_ack_sent state.
Martin Hurton's avatar
Martin Hurton committed
325
    else
326 327
    if (state == delimiter_received) {
        state = term_ack_sent;
328
        outpipe = NULL;
329 330 331 332 333 334
        send_pipe_term_ack (peer);
    }

    //  This is the case where both ends of the pipe are closed in parallel.
    //  We simply reply to the request by ack and continue waiting for our
    //  own ack.
Martin Hurton's avatar
Martin Hurton committed
335
    else
336 337
    if (state == term_req_sent1) {
        state = term_req_sent2;
338
        outpipe = NULL;
339 340
        send_pipe_term_ack (peer);
    }
Martin Sustrik's avatar
Martin Sustrik committed
341 342
}

343
void zmq::pipe_t::process_pipe_term_ack ()
344
{
345 346
    //  Notify the user that all the references to the pipe should be dropped.
    zmq_assert (sink);
347
    sink->pipe_terminated (this);
348

349 350 351 352 353
    //  In term_ack_sent and term_req_sent2 states there's nothing to do.
    //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
    //  the peer before deallocating this side of the pipe.
    //  All the other states are invalid.
    if (state == term_req_sent1) {
354
        outpipe = NULL;
355
        send_pipe_term_ack (peer);
356
    }
357
    else
358
        zmq_assert (state == term_ack_sent || state == term_req_sent2);
359 360 361 362 363 364

    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
    //  pipe (which is an inbound pipe from its point of view).
    //  First, delete all the unread messages in the pipe. We have to do it by
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
    //  the ypipe itself.
365

danielkr's avatar
danielkr committed
366
    if (!conflate) {
367 368 369 370 371
        msg_t msg;
        while (inpipe->read (&msg)) {
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
Martin Hurton's avatar
Martin Hurton committed
372
    }
373

374
    LIBZMQ_DELETE(inpipe);
375

376 377
    //  Deallocate the pipe object
    delete this;
Martin Sustrik's avatar
Martin Sustrik committed
378 379
}

380 381 382 383 384
void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
{
    set_hwms(inhwm_, outhwm_);
}

Ian Barber's avatar
Ian Barber committed
385 386 387 388 389
void zmq::pipe_t::set_nodelay ()
{
    this->delay = false;
}

390
void zmq::pipe_t::terminate (bool delay_)
Martin Sustrik's avatar
Martin Sustrik committed
391
{
392 393 394
    //  Overload the value specified at pipe creation.
    delay = delay_;

395
    //  If terminate was already called, we can ignore the duplicate invocation.
396
    if (state == term_req_sent1 || state == term_req_sent2) {
397
        return;
398
    }
399 400
    //  If the pipe is in the final phase of async termination, it's going to
    //  closed anyway. No need to do anything special here.
401
    else if (state == term_ack_sent) {
402
        return;
403
    }
404 405
    //  The simple sync termination case. Ask the peer to terminate and wait
    //  for the ack.
406
    else if (state == active) {
407
        send_pipe_term (peer);
408
        state = term_req_sent1;
409 410 411
    }
    //  There are still pending messages available, but the user calls
    //  'terminate'. We can act as if all the pending messages were read.
412
    else if (state == waiting_for_delimiter && !delay) {
413 414
        //  Drop any unfinished outbound messages.
        rollback ();
Martin Hurton's avatar
Martin Hurton committed
415 416
        outpipe = NULL;
        send_pipe_term_ack (peer);
417
        state = term_ack_sent;
418
    }
Sergey M․'s avatar
Sergey M․ committed
419
    //  If there are pending messages still available, do nothing.
420
    else if (state == waiting_for_delimiter) {
421 422 423
    }
    //  We've already got delimiter, but not term command yet. We can ignore
    //  the delimiter and ack synchronously terminate as if we were in
Martin Hurton's avatar
Martin Hurton committed
424
    //  active state.
425
    else if (state == delimiter_received) {
426
        send_pipe_term (peer);
427
        state = term_req_sent1;
428 429
    }
    //  There are no other states.
430
    else {
431
        zmq_assert (false);
432
    }
Martin Sustrik's avatar
Martin Sustrik committed
433

434
    //  Stop outbound flow of messages.
435
    out_active = false;
Martin Hurton's avatar
Martin Hurton committed
436

437
    if (outpipe) {
438

439 440
        //  Drop any unfinished outbound messages.
        rollback ();
441

442 443 444 445 446 447
        //  Write the delimiter into the pipe. Note that watermarks are not
        //  checked; thus the delimiter can be written even when the pipe is full.
        msg_t msg;
        msg.init_delimiter ();
        outpipe->write (msg, false);
        flush ();
448
    }
Martin Sustrik's avatar
Martin Sustrik committed
449 450
}

451
bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
Martin Sustrik's avatar
Martin Sustrik committed
452
{
453
    return msg_.is_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
454
}
455

456
int zmq::pipe_t::compute_lwm (int hwm_)
457
{
458
    //  Compute the low water mark. Following point should be taken
459 460 461 462 463 464 465 466 467 468 469 470 471
    //  into consideration:
    //
    //  1. LWM has to be less than HWM.
    //  2. LWM cannot be set to very low value (such as zero) as after filling
    //     the queue it would start to refill only after all the messages are
    //     read from it and thus unnecessarily hold the progress back.
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
    //     result in lock-step filling of the queue - if a single message is
    //     read from a full queue, writer thread is resumed to write exactly one
    //     message to the queue and go back to sleep immediately. This would
    //     result in low performance.
    //
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
472 473 474
    //  possible to reduce the thread switching overhead to almost zero.
    //  Let's make LWM 1/2 of HWM.
    int result = (hwm_ + 1) / 2;
475

476
    return result;
477
}
478

479
void zmq::pipe_t::process_delimiter ()
480
{
481 482
    zmq_assert (state == active
            ||  state == waiting_for_delimiter);
483

484 485 486
    if (state == active)
        state = delimiter_received;
    else {
487
        outpipe = NULL;
488
        send_pipe_term_ack (peer);
489
        state = term_ack_sent;
490 491
    }
}
492 493 494 495 496 497 498 499 500 501 502 503

void zmq::pipe_t::hiccup ()
{
    //  If termination is already under way do nothing.
    if (state != active)
        return;

    //  We'll drop the pointer to the inpipe. From now on, the peer is
    //  responsible for deallocating it.
    inpipe = NULL;

    //  Create new inpipe.
504
    if (conflate)
505
        inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
506
    else
507
        inpipe = new (std::nothrow)ypipe_t <msg_t, message_pipe_granularity>();
508

509 510 511 512 513 514 515
    alloc_assert (inpipe);
    in_active = true;

    //  Notify the peer about the hiccup.
    send_hiccup (peer, (void*) inpipe);
}

516 517
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
518 519
    int in = inhwm_ + (inhwmboost > 0 ? inhwmboost : 0);
    int out = outhwm_ + (outhwmboost > 0 ? outhwmboost : 0);
520 521

    // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
522
    if (inhwm_ <= 0 || inhwmboost == 0)
523 524
        in = 0;

525
    if (outhwm_ <= 0 || outhwmboost == 0)
526
        out = 0;
527

528 529
    lwm = compute_lwm(in);
    hwm = out;
530 531 532 533 534 535
}

void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
{
    inhwmboost = inhwmboost_;
    outhwmboost = outhwmboost_;
536
}
537

Martin Hurton's avatar
Martin Hurton committed
538
bool zmq::pipe_t::check_hwm () const
539
{
540
    bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
541 542
    return( !full );
}
543 544 545 546 547

void zmq::pipe_t::send_hwms_to_peer(int inhwm_, int outhwm_)
{
    send_pipe_hwm(peer, inhwm_, outhwm_);
}