pipe.cpp 16.5 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <stddef.h>
33

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37

38 39 40
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"

41 42 43 44
int zmq::pipepair (class object_t *parents_[2],
                   class pipe_t *pipes_[2],
                   int hwms_[2],
                   bool conflate_[2])
45 46 47 48
{
    //   Creates two pipe objects. These objects are connected by two ypipes,
    //   each to pass messages in one direction.

49 50
    typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
    typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
51 52

    pipe_t::upipe_t *upipe1;
53
    if (conflate_[0])
54 55 56
        upipe1 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe1 = new (std::nothrow) upipe_normal_t ();
57
    alloc_assert (upipe1);
58 59

    pipe_t::upipe_t *upipe2;
60
    if (conflate_[1])
61 62 63
        upipe2 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe2 = new (std::nothrow) upipe_normal_t ();
64 65
    alloc_assert (upipe2);

66 67 68 69 70 71
    pipes_[0] = new (std::nothrow)
      pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
    alloc_assert (pipes_[0]);
    pipes_[1] = new (std::nothrow)
      pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
    alloc_assert (pipes_[1]);
72

73 74
    pipes_[0]->set_peer (pipes_[1]);
    pipes_[1]->set_peer (pipes_[0]);
75 76 77 78

    return 0;
}

79 80 81 82 83 84 85 86 87 88 89 90
void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_)
{
    zmq::msg_t id;
    const int rc = id.init_size (options_.routing_id_size);
    errno_assert (rc == 0);
    memcpy (id.data (), options_.routing_id, options_.routing_id_size);
    id.set_flags (zmq::msg_t::routing_id);
    const bool written = pipe_->write (&id);
    zmq_assert (written);
    pipe_->flush ();
}

91 92 93 94 95 96
zmq::pipe_t::pipe_t (object_t *parent_,
                     upipe_t *inpipe_,
                     upipe_t *outpipe_,
                     int inhwm_,
                     int outhwm_,
                     bool conflate_) :
Martin Sustrik's avatar
Martin Sustrik committed
97
    object_t (parent_),
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
    _in_pipe (inpipe_),
    _out_pipe (outpipe_),
    _in_active (true),
    _out_active (true),
    _hwm (outhwm_),
    _lwm (compute_lwm (inhwm_)),
    _in_hwm_boost (-1),
    _out_hwm_boost (-1),
    _msgs_read (0),
    _msgs_written (0),
    _peers_msgs_read (0),
    _peer (NULL),
    _sink (NULL),
    _state (active),
    _delay (true),
    _server_socket_routing_id (0),
    _conflate (conflate_)
115 116 117
{
}

118
zmq::pipe_t::~pipe_t ()
119 120
{
}
Martin Sustrik's avatar
Martin Sustrik committed
121

122
void zmq::pipe_t::set_peer (pipe_t *peer_)
Martin Sustrik's avatar
Martin Sustrik committed
123
{
124
    //  Peer can be set once only.
125 126
    zmq_assert (!_peer);
    _peer = peer_;
Martin Sustrik's avatar
Martin Sustrik committed
127 128
}

129
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
unknown's avatar
unknown committed
130
{
131
    // Sink can be set once only.
132 133
    zmq_assert (!_sink);
    _sink = sink_;
unknown's avatar
unknown committed
134 135
}

136 137
void zmq::pipe_t::set_server_socket_routing_id (
  uint32_t server_socket_routing_id_)
138
{
139
    _server_socket_routing_id = server_socket_routing_id_;
140 141
}

142
uint32_t zmq::pipe_t::get_server_socket_routing_id () const
143
{
144
    return _server_socket_routing_id;
145 146
}

147 148
void zmq::pipe_t::set_router_socket_routing_id (
  const blob_t &router_socket_routing_id_)
149
{
150
    _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
151 152
}

153
const zmq::blob_t &zmq::pipe_t::get_routing_id () const
154
{
155
    return _router_socket_routing_id;
156 157
}

158
bool zmq::pipe_t::check_read ()
159
{
160
    if (unlikely (!_in_active))
161
        return false;
162
    if (unlikely (_state != active && _state != waiting_for_delimiter))
163 164
        return false;

165
    //  Check if there's an item in the pipe.
166 167
    if (!_in_pipe->check_read ()) {
        _in_active = false;
168
        return false;
169
    }
170 171

    //  If the next item in the pipe is message delimiter,
172
    //  initiate termination process.
173
    if (_in_pipe->probe (is_delimiter)) {
174
        msg_t msg;
175
        const bool ok = _in_pipe->read (&msg);
176
        zmq_assert (ok);
177
        process_delimiter ();
178 179 180 181
        return false;
    }

    return true;
182 183
}

184
bool zmq::pipe_t::read (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
185
{
186
    if (unlikely (!_in_active))
187
        return false;
188
    if (unlikely (_state != active && _state != waiting_for_delimiter))
189 190
        return false;

191
    for (bool payload_read = false; !payload_read;) {
192 193
        if (!_in_pipe->read (msg_)) {
            _in_active = false;
194 195 196
            return false;
        }

197
        //  If this is a credential, ignore it and receive next message.
198 199 200 201 202
        if (unlikely (msg_->is_credential ())) {
            const int rc = msg_->close ();
            zmq_assert (rc == 0);
        } else
            payload_read = true;
203 204
    }

Martin Sustrik's avatar
Martin Sustrik committed
205
    //  If delimiter was read, start termination process of the pipe.
206
    if (msg_->is_delimiter ()) {
207
        process_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
208 209
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
210

211
    if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
212
        _msgs_read++;
213

214 215
    if (_lwm > 0 && _msgs_read % _lwm == 0)
        send_activate_write (_peer, _msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
216 217

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
218 219
}

220
bool zmq::pipe_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
221
{
222
    if (unlikely (!_out_active || _state != active))
223
        return false;
224

225
    const bool full = !check_hwm ();
Martin Sustrik's avatar
Martin Sustrik committed
226

227
    if (unlikely (full)) {
228
        _out_active = false;
229 230 231 232
        return false;
    }

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
}

235
bool zmq::pipe_t::write (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
236
{
237
    if (unlikely (!check_write ()))
238
        return false;
239

240
    const bool more = (msg_->flags () & msg_t::more) != 0;
241
    const bool is_routing_id = msg_->is_routing_id ();
242
    _out_pipe->write (*msg_, more);
243
    if (!more && !is_routing_id)
244
        _msgs_written++;
245

246
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
247 248
}

249
void zmq::pipe_t::rollback () const
Martin Sustrik's avatar
Martin Sustrik committed
250
{
251 252
    //  Remove incomplete message from the outbound pipe.
    msg_t msg;
253 254
    if (_out_pipe) {
        while (_out_pipe->unwrite (&msg)) {
255
            zmq_assert (msg.flags () & msg_t::more);
256
            const int rc = msg.close ();
257 258
            errno_assert (rc == 0);
        }
259
    }
Martin Sustrik's avatar
Martin Sustrik committed
260 261
}

262
void zmq::pipe_t::flush ()
Martin Sustrik's avatar
Martin Sustrik committed
263
{
264
    //  The peer does not exist anymore at this point.
265
    if (_state == term_ack_sent)
266 267
        return;

268 269
    if (_out_pipe && !_out_pipe->flush ())
        send_activate_read (_peer);
Martin Sustrik's avatar
Martin Sustrik committed
270 271
}

272
void zmq::pipe_t::process_activate_read ()
unknown's avatar
unknown committed
273
{
274 275 276
    if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
        _in_active = true;
        _sink->read_activated (this);
277
    }
unknown's avatar
unknown committed
278 279
}

280
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
Martin Sustrik's avatar
Martin Sustrik committed
281
{
282
    //  Remember the peer's message sequence number.
283
    _peers_msgs_read = msgs_read_;
284

285 286 287
    if (!_out_active && _state == active) {
        _out_active = true;
        _sink->write_activated (this);
Martin Hurton's avatar
Martin Hurton committed
288
    }
Martin Sustrik's avatar
Martin Sustrik committed
289 290
}

291 292 293 294
void zmq::pipe_t::process_hiccup (void *pipe_)
{
    //  Destroy old outpipe. Note that the read end of the pipe was already
    //  migrated to this thread.
295 296
    zmq_assert (_out_pipe);
    _out_pipe->flush ();
297
    msg_t msg;
298
    while (_out_pipe->read (&msg)) {
299
        if (!(msg.flags () & msg_t::more))
300
            _msgs_written--;
301
        const int rc = msg.close ();
302
        errno_assert (rc == 0);
303
    }
304
    LIBZMQ_DELETE (_out_pipe);
305 306 307

    //  Plug in the new outpipe.
    zmq_assert (pipe_);
308 309
    _out_pipe = static_cast<upipe_t *> (pipe_);
    _out_active = true;
310 311

    //  If appropriate, notify the user about the hiccup.
312 313
    if (_state == active)
        _sink->hiccuped (this);
314 315
}

316
void zmq::pipe_t::process_pipe_term ()
Martin Sustrik's avatar
Martin Sustrik committed
317
{
318 319
    zmq_assert (_state == active || _state == delimiter_received
                || _state == term_req_sent1);
Martin Hurton's avatar
Martin Hurton committed
320

321 322
    //  This is the simple case of peer-induced termination. If there are no
    //  more pending messages to read, or if the pipe was configured to drop
323 324 325
    //  pending messages, we can move directly to the term_ack_sent state.
    //  Otherwise we'll hang up in waiting_for_delimiter state till all
    //  pending messages are read.
326 327 328
    if (_state == active) {
        if (_delay)
            _state = waiting_for_delimiter;
Martin Hurton's avatar
Martin Hurton committed
329
        else {
330 331 332
            _state = term_ack_sent;
            _out_pipe = NULL;
            send_pipe_term_ack (_peer);
333 334 335 336
        }
    }

    //  Delimiter happened to arrive before the term command. Now we have the
337
    //  term command as well, so we can move straight to term_ack_sent state.
338 339 340 341
    else if (_state == delimiter_received) {
        _state = term_ack_sent;
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
342 343 344 345 346
    }

    //  This is the case where both ends of the pipe are closed in parallel.
    //  We simply reply to the request by ack and continue waiting for our
    //  own ack.
347 348 349 350
    else if (_state == term_req_sent1) {
        _state = term_req_sent2;
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
351
    }
Martin Sustrik's avatar
Martin Sustrik committed
352 353
}

354
void zmq::pipe_t::process_pipe_term_ack ()
355
{
356
    //  Notify the user that all the references to the pipe should be dropped.
357 358
    zmq_assert (_sink);
    _sink->pipe_terminated (this);
359

360 361 362 363
    //  In term_ack_sent and term_req_sent2 states there's nothing to do.
    //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
    //  the peer before deallocating this side of the pipe.
    //  All the other states are invalid.
364 365 366
    if (_state == term_req_sent1) {
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
367
    } else
368
        zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
369 370 371 372 373 374

    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
    //  pipe (which is an inbound pipe from its point of view).
    //  First, delete all the unread messages in the pipe. We have to do it by
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
    //  the ypipe itself.
375

376
    if (!_conflate) {
377
        msg_t msg;
378
        while (_in_pipe->read (&msg)) {
379
            const int rc = msg.close ();
380 381
            errno_assert (rc == 0);
        }
Martin Hurton's avatar
Martin Hurton committed
382
    }
383

384
    LIBZMQ_DELETE (_in_pipe);
385

386 387
    //  Deallocate the pipe object
    delete this;
Martin Sustrik's avatar
Martin Sustrik committed
388 389
}

390 391
void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
{
392
    set_hwms (inhwm_, outhwm_);
393 394
}

Ian Barber's avatar
Ian Barber committed
395 396
void zmq::pipe_t::set_nodelay ()
{
397
    this->_delay = false;
Ian Barber's avatar
Ian Barber committed
398 399
}

400
void zmq::pipe_t::terminate (bool delay_)
Martin Sustrik's avatar
Martin Sustrik committed
401
{
402
    //  Overload the value specified at pipe creation.
403
    _delay = delay_;
404

405
    //  If terminate was already called, we can ignore the duplicate invocation.
406
    if (_state == term_req_sent1 || _state == term_req_sent2) {
407
        return;
408
    }
409 410
    //  If the pipe is in the final phase of async termination, it's going to
    //  closed anyway. No need to do anything special here.
411
    if (_state == term_ack_sent) {
412
        return;
413
    }
414 415
    //  The simple sync termination case. Ask the peer to terminate and wait
    //  for the ack.
416
    if (_state == active) {
417 418
        send_pipe_term (_peer);
        _state = term_req_sent1;
419 420 421
    }
    //  There are still pending messages available, but the user calls
    //  'terminate'. We can act as if all the pending messages were read.
422
    else if (_state == waiting_for_delimiter && !_delay) {
423 424
        //  Drop any unfinished outbound messages.
        rollback ();
425 426 427
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
        _state = term_ack_sent;
428
    }
Sergey M․'s avatar
Sergey M․ committed
429
    //  If there are pending messages still available, do nothing.
430
    else if (_state == waiting_for_delimiter) {
431 432 433
    }
    //  We've already got delimiter, but not term command yet. We can ignore
    //  the delimiter and ack synchronously terminate as if we were in
Martin Hurton's avatar
Martin Hurton committed
434
    //  active state.
435 436 437
    else if (_state == delimiter_received) {
        send_pipe_term (_peer);
        _state = term_req_sent1;
438 439
    }
    //  There are no other states.
440
    else {
441
        zmq_assert (false);
442
    }
Martin Sustrik's avatar
Martin Sustrik committed
443

444
    //  Stop outbound flow of messages.
445
    _out_active = false;
Martin Hurton's avatar
Martin Hurton committed
446

447
    if (_out_pipe) {
448 449
        //  Drop any unfinished outbound messages.
        rollback ();
450

451 452 453 454
        //  Write the delimiter into the pipe. Note that watermarks are not
        //  checked; thus the delimiter can be written even when the pipe is full.
        msg_t msg;
        msg.init_delimiter ();
455
        _out_pipe->write (msg, false);
456
        flush ();
457
    }
Martin Sustrik's avatar
Martin Sustrik committed
458 459
}

460
bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
Martin Sustrik's avatar
Martin Sustrik committed
461
{
462
    return msg_.is_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
463
}
464

465
int zmq::pipe_t::compute_lwm (int hwm_)
466
{
467
    //  Compute the low water mark. Following point should be taken
468 469 470 471 472 473 474 475 476 477 478 479 480
    //  into consideration:
    //
    //  1. LWM has to be less than HWM.
    //  2. LWM cannot be set to very low value (such as zero) as after filling
    //     the queue it would start to refill only after all the messages are
    //     read from it and thus unnecessarily hold the progress back.
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
    //     result in lock-step filling of the queue - if a single message is
    //     read from a full queue, writer thread is resumed to write exactly one
    //     message to the queue and go back to sleep immediately. This would
    //     result in low performance.
    //
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
481 482
    //  possible to reduce the thread switching overhead to almost zero.
    //  Let's make LWM 1/2 of HWM.
483
    const int result = (hwm_ + 1) / 2;
484

485
    return result;
486
}
487

488
void zmq::pipe_t::process_delimiter ()
489
{
490
    zmq_assert (_state == active || _state == waiting_for_delimiter);
491

492 493
    if (_state == active)
        _state = delimiter_received;
494
    else {
495 496 497
        _out_pipe = NULL;
        send_pipe_term_ack (_peer);
        _state = term_ack_sent;
498 499
    }
}
500 501 502 503

void zmq::pipe_t::hiccup ()
{
    //  If termination is already under way do nothing.
504
    if (_state != active)
505 506 507 508 509 510
        return;

    //  We'll drop the pointer to the inpipe. From now on, the peer is
    //  responsible for deallocating it.

    //  Create new inpipe.
511 512 513 514
    _in_pipe =
      _conflate
        ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ())
        : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
515

516 517
    alloc_assert (_in_pipe);
    _in_active = true;
518 519

    //  Notify the peer about the hiccup.
520
    send_hiccup (_peer, _in_pipe);
521 522
}

523 524
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
525 526
    int in = inhwm_ + std::max (_in_hwm_boost, 0);
    int out = outhwm_ + std::max (_out_hwm_boost, 0);
527 528

    // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
529
    if (inhwm_ <= 0 || _in_hwm_boost == 0)
530 531
        in = 0;

532
    if (outhwm_ <= 0 || _out_hwm_boost == 0)
533
        out = 0;
534

535 536
    _lwm = compute_lwm (in);
    _hwm = out;
537 538
}

539
void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
540
{
541 542
    _in_hwm_boost = inhwmboost_;
    _out_hwm_boost = outhwmboost_;
543
}
544

Martin Hurton's avatar
Martin Hurton committed
545
bool zmq::pipe_t::check_hwm () const
546
{
547 548
    const bool full =
      _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
549
    return !full;
550
}
551

552
void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
553
{
554
    send_pipe_hwm (_peer, inhwm_, outhwm_);
555
}
556

557
void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
558
{
559
    _endpoint_pair = ZMQ_MOVE (endpoint_pair_);
560 561
}

562
const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
563
{
564
    return _endpoint_pair;
565
}
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581

void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
{
    endpoint_uri_pair_t *ep =
      new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
    send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
                          ep);
}

void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
                                           own_t *socket_base_,
                                           endpoint_uri_pair_t *endpoint_pair_)
{
    send_pipe_stats_publish (socket_base_, queue_count_,
                             _msgs_written - _peers_msgs_read, endpoint_pair_);
}