pipe.cpp 14.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32
#include <stddef.h>
33

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "pipe.hpp"
36
#include "err.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
37

38 39 40
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"

41
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
Ian Barber's avatar
Ian Barber committed
42
    int hwms_ [2], bool conflate_ [2])
43 44 45 46
{
    //   Creates two pipe objects. These objects are connected by two ypipes,
    //   each to pass messages in one direction.

47 48
    typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
    typedef ypipe_conflate_t <msg_t> upipe_conflate_t;
49 50 51 52 53 54

    pipe_t::upipe_t *upipe1;
    if(conflate_ [0])
        upipe1 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe1 = new (std::nothrow) upipe_normal_t ();
55
    alloc_assert (upipe1);
56 57 58 59 60 61

    pipe_t::upipe_t *upipe2;
    if(conflate_ [1])
        upipe2 = new (std::nothrow) upipe_conflate_t ();
    else
        upipe2 = new (std::nothrow) upipe_normal_t ();
62 63 64
    alloc_assert (upipe2);

    pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
Ian Barber's avatar
Ian Barber committed
65
        hwms_ [1], hwms_ [0], conflate_ [0]);
66 67
    alloc_assert (pipes_ [0]);
    pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
Ian Barber's avatar
Ian Barber committed
68
        hwms_ [0], hwms_ [1], conflate_ [1]);
69 70 71 72 73 74 75 76 77
    alloc_assert (pipes_ [1]);

    pipes_ [0]->set_peer (pipes_ [1]);
    pipes_ [1]->set_peer (pipes_ [0]);

    return 0;
}

zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
Ian Barber's avatar
Ian Barber committed
78
      int inhwm_, int outhwm_, bool conflate_) :
Martin Sustrik's avatar
Martin Sustrik committed
79
    object_t (parent_),
80 81 82 83 84 85
    inpipe (inpipe_),
    outpipe (outpipe_),
    in_active (true),
    out_active (true),
    hwm (outhwm_),
    lwm (compute_lwm (inhwm_)),
86 87
    inhwmboost(0),
    outhwmboost(0),
Martin Hurton's avatar
Martin Hurton committed
88
    msgs_read (0),
89 90 91
    msgs_written (0),
    peers_msgs_read (0),
    peer (NULL),
92
    sink (NULL),
93
    state (active),
Ian Barber's avatar
Ian Barber committed
94
    delay (true),
95
    routing_id(0),
96
    conflate (conflate_)
97 98 99
{
}

100
zmq::pipe_t::~pipe_t ()
101 102
{
}
Martin Sustrik's avatar
Martin Sustrik committed
103

104
void zmq::pipe_t::set_peer (pipe_t *peer_)
Martin Sustrik's avatar
Martin Sustrik committed
105
{
106 107 108
    //  Peer can be set once only.
    zmq_assert (!peer);
    peer = peer_;
Martin Sustrik's avatar
Martin Sustrik committed
109 110
}

111
void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
unknown's avatar
unknown committed
112
{
113
    // Sink can be set once only.
114
    zmq_assert (!sink);
115
    sink = sink_;
unknown's avatar
unknown committed
116 117
}

118 119 120 121 122 123 124 125 126 127
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
{
    routing_id = routing_id_;
}

uint32_t zmq::pipe_t::get_routing_id ()
{
    return routing_id;
}

128
void zmq::pipe_t::set_identity (const blob_t &identity_)
129
{
130
    identity = identity_;
131 132
}

133
zmq::blob_t zmq::pipe_t::get_identity ()
134
{
135
    return identity;
136 137
}

138 139 140 141 142
zmq::blob_t zmq::pipe_t::get_credential () const
{
    return credential;
}

143
bool zmq::pipe_t::check_read ()
144
{
145 146 147
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
148 149
        return false;

150
    //  Check if there's an item in the pipe.
151 152
    if (!inpipe->check_read ()) {
        in_active = false;
153
        return false;
154
    }
155 156

    //  If the next item in the pipe is message delimiter,
157 158
    //  initiate termination process.
    if (inpipe->probe (is_delimiter)) {
159
        msg_t msg;
160
        bool ok = inpipe->read (&msg);
161
        zmq_assert (ok);
162
        process_delimiter ();
163 164 165 166
        return false;
    }

    return true;
167 168
}

169
bool zmq::pipe_t::read (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
170
{
171 172 173
    if (unlikely (!in_active))
        return false;
    if (unlikely (state != active && state != waiting_for_delimiter))
174 175
        return false;

176
read_message:
177 178
    if (!inpipe->read (msg_)) {
        in_active = false;
Martin Sustrik's avatar
Martin Sustrik committed
179
        return false;
180
    }
Martin Sustrik's avatar
Martin Sustrik committed
181

182 183 184 185 186 187 188 189 190
    //  If this is a credential, save a copy and receive next message.
    if (unlikely (msg_->is_credential ())) {
        const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
        credential = blob_t (data, msg_->size ());
        const int rc = msg_->close ();
        zmq_assert (rc == 0);
        goto read_message;
    }

Martin Sustrik's avatar
Martin Sustrik committed
191
    //  If delimiter was read, start termination process of the pipe.
192
    if (msg_->is_delimiter ()) {
193
        process_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
194 195
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
196

197
    if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
198 199
        msgs_read++;

Martin Hurton's avatar
Martin Hurton committed
200
    if (lwm > 0 && msgs_read % lwm == 0)
201
        send_activate_write (peer, msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
202 203

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
204 205
}

206
bool zmq::pipe_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
207
{
208
    if (unlikely (!out_active || state != active))
209
        return false;
210

211
    bool full = !check_hwm();
Martin Sustrik's avatar
Martin Sustrik committed
212

213 214 215 216 217 218
    if (unlikely (full)) {
        out_active = false;
        return false;
    }

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
219 220
}

221
bool zmq::pipe_t::write (msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
222
{
223
    if (unlikely (!check_write ()))
224
        return false;
225

226
    bool more = msg_->flags () & msg_t::more ? true : false;
227
    const bool is_identity = msg_->is_identity ();
228
    outpipe->write (*msg_, more);
229
    if (!more && !is_identity)
230
        msgs_written++;
231

232
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
}

235
void zmq::pipe_t::rollback ()
Martin Sustrik's avatar
Martin Sustrik committed
236
{
237 238
    //  Remove incomplete message from the outbound pipe.
    msg_t msg;
239
    if (outpipe) {
240 241 242 243 244
        while (outpipe->unwrite (&msg)) {
            zmq_assert (msg.flags () & msg_t::more);
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
245
    }
Martin Sustrik's avatar
Martin Sustrik committed
246 247
}

248
void zmq::pipe_t::flush ()
Martin Sustrik's avatar
Martin Sustrik committed
249
{
250
    //  The peer does not exist anymore at this point.
251
    if (state == term_ack_sent)
252 253
        return;

254
    if (outpipe && !outpipe->flush ())
255
        send_activate_read (peer);
Martin Sustrik's avatar
Martin Sustrik committed
256 257
}

258
void zmq::pipe_t::process_activate_read ()
unknown's avatar
unknown committed
259
{
260
    if (!in_active && (state == active || state == waiting_for_delimiter)) {
261 262 263
        in_active = true;
        sink->read_activated (this);
    }
unknown's avatar
unknown committed
264 265
}

266
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
Martin Sustrik's avatar
Martin Sustrik committed
267
{
268
    //  Remember the peer's message sequence number.
269
    peers_msgs_read = msgs_read_;
270

271
    if (!out_active && state == active) {
272 273
        out_active = true;
        sink->write_activated (this);
Martin Hurton's avatar
Martin Hurton committed
274
    }
Martin Sustrik's avatar
Martin Sustrik committed
275 276
}

277 278 279 280 281 282 283 284
void zmq::pipe_t::process_hiccup (void *pipe_)
{
    //  Destroy old outpipe. Note that the read end of the pipe was already
    //  migrated to this thread.
    zmq_assert (outpipe);
    outpipe->flush ();
    msg_t msg;
    while (outpipe->read (&msg)) {
285 286
       if (!(msg.flags () & msg_t::more))
            msgs_written--;
287 288 289
       int rc = msg.close ();
       errno_assert (rc == 0);
    }
290
    LIBZMQ_DELETE(outpipe);
291 292 293 294 295 296 297 298 299 300 301

    //  Plug in the new outpipe.
    zmq_assert (pipe_);
    outpipe = (upipe_t*) pipe_;
    out_active = true;

    //  If appropriate, notify the user about the hiccup.
    if (state == active)
        sink->hiccuped (this);
}

302
void zmq::pipe_t::process_pipe_term ()
Martin Sustrik's avatar
Martin Sustrik committed
303
{
Martin Hurton's avatar
Martin Hurton committed
304 305 306 307
    zmq_assert (state == active
            ||  state == delimiter_received
            ||  state == term_req_sent1);

308 309
    //  This is the simple case of peer-induced termination. If there are no
    //  more pending messages to read, or if the pipe was configured to drop
310 311 312
    //  pending messages, we can move directly to the term_ack_sent state.
    //  Otherwise we'll hang up in waiting_for_delimiter state till all
    //  pending messages are read.
313
    if (state == active) {
Martin Hurton's avatar
Martin Hurton committed
314 315 316
        if (delay)
            state = waiting_for_delimiter;
        else {
317
            state = term_ack_sent;
318
            outpipe = NULL;
319 320 321 322 323
            send_pipe_term_ack (peer);
        }
    }

    //  Delimiter happened to arrive before the term command. Now we have the
324
    //  term command as well, so we can move straight to term_ack_sent state.
Martin Hurton's avatar
Martin Hurton committed
325
    else
326 327
    if (state == delimiter_received) {
        state = term_ack_sent;
328
        outpipe = NULL;
329 330 331 332 333 334
        send_pipe_term_ack (peer);
    }

    //  This is the case where both ends of the pipe are closed in parallel.
    //  We simply reply to the request by ack and continue waiting for our
    //  own ack.
Martin Hurton's avatar
Martin Hurton committed
335
    else
336 337
    if (state == term_req_sent1) {
        state = term_req_sent2;
338
        outpipe = NULL;
339 340
        send_pipe_term_ack (peer);
    }
Martin Sustrik's avatar
Martin Sustrik committed
341 342
}

343
void zmq::pipe_t::process_pipe_term_ack ()
344
{
345 346
    //  Notify the user that all the references to the pipe should be dropped.
    zmq_assert (sink);
347
    sink->pipe_terminated (this);
348

349 350 351 352 353
    //  In term_ack_sent and term_req_sent2 states there's nothing to do.
    //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
    //  the peer before deallocating this side of the pipe.
    //  All the other states are invalid.
    if (state == term_req_sent1) {
354
        outpipe = NULL;
355
        send_pipe_term_ack (peer);
356
    }
357
    else
358
        zmq_assert (state == term_ack_sent || state == term_req_sent2);
359 360 361 362 363 364

    //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
    //  pipe (which is an inbound pipe from its point of view).
    //  First, delete all the unread messages in the pipe. We have to do it by
    //  hand because msg_t doesn't have automatic destructor. Then deallocate
    //  the ypipe itself.
365

danielkr's avatar
danielkr committed
366
    if (!conflate) {
367 368 369 370 371
        msg_t msg;
        while (inpipe->read (&msg)) {
            int rc = msg.close ();
            errno_assert (rc == 0);
        }
Martin Hurton's avatar
Martin Hurton committed
372
    }
373

374
    LIBZMQ_DELETE(inpipe);
375

376 377
    //  Deallocate the pipe object
    delete this;
Martin Sustrik's avatar
Martin Sustrik committed
378 379
}

Ian Barber's avatar
Ian Barber committed
380 381 382 383 384
void zmq::pipe_t::set_nodelay ()
{
    this->delay = false;
}

385
void zmq::pipe_t::terminate (bool delay_)
Martin Sustrik's avatar
Martin Sustrik committed
386
{
387 388 389
    //  Overload the value specified at pipe creation.
    delay = delay_;

390
    //  If terminate was already called, we can ignore the duplicate invocation.
391
    if (state == term_req_sent1 || state == term_req_sent2) {
392
        return;
393
	}
394 395
    //  If the pipe is in the final phase of async termination, it's going to
    //  closed anyway. No need to do anything special here.
396
    else if (state == term_ack_sent) {
397
        return;
398
	}
399 400
    //  The simple sync termination case. Ask the peer to terminate and wait
    //  for the ack.
401
    else if (state == active) {
402
        send_pipe_term (peer);
403
        state = term_req_sent1;
404 405 406
    }
    //  There are still pending messages available, but the user calls
    //  'terminate'. We can act as if all the pending messages were read.
407
    else if (state == waiting_for_delimiter && !delay) {
Martin Hurton's avatar
Martin Hurton committed
408 409
        outpipe = NULL;
        send_pipe_term_ack (peer);
410
        state = term_ack_sent;
411
    }
Sergey M․'s avatar
Sergey M․ committed
412
    //  If there are pending messages still available, do nothing.
413
    else if (state == waiting_for_delimiter) {
414 415 416
    }
    //  We've already got delimiter, but not term command yet. We can ignore
    //  the delimiter and ack synchronously terminate as if we were in
Martin Hurton's avatar
Martin Hurton committed
417
    //  active state.
418
    else if (state == delimiter_received) {
419
        send_pipe_term (peer);
420
        state = term_req_sent1;
421 422
    }
    //  There are no other states.
423
    else {
424
        zmq_assert (false);
425
	}
Martin Sustrik's avatar
Martin Sustrik committed
426

427
    //  Stop outbound flow of messages.
428
    out_active = false;
Martin Hurton's avatar
Martin Hurton committed
429

430
    if (outpipe) {
431

432 433
        //  Drop any unfinished outbound messages.
        rollback ();
434

435 436 437 438 439 440
        //  Write the delimiter into the pipe. Note that watermarks are not
        //  checked; thus the delimiter can be written even when the pipe is full.
        msg_t msg;
        msg.init_delimiter ();
        outpipe->write (msg, false);
        flush ();
441
    }
Martin Sustrik's avatar
Martin Sustrik committed
442 443
}

444
bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
Martin Sustrik's avatar
Martin Sustrik committed
445
{
446
    return msg_.is_delimiter ();
Martin Sustrik's avatar
Martin Sustrik committed
447
}
448

449
int zmq::pipe_t::compute_lwm (int hwm_)
450
{
451
    //  Compute the low water mark. Following point should be taken
452 453 454 455 456 457 458 459 460 461 462 463 464
    //  into consideration:
    //
    //  1. LWM has to be less than HWM.
    //  2. LWM cannot be set to very low value (such as zero) as after filling
    //     the queue it would start to refill only after all the messages are
    //     read from it and thus unnecessarily hold the progress back.
    //  3. LWM cannot be set to very high value (such as HWM-1) as it would
    //     result in lock-step filling of the queue - if a single message is
    //     read from a full queue, writer thread is resumed to write exactly one
    //     message to the queue and go back to sleep immediately. This would
    //     result in low performance.
    //
    //  Given the 3. it would be good to keep HWM and LWM as far apart as
465 466 467
    //  possible to reduce the thread switching overhead to almost zero.
    //  Let's make LWM 1/2 of HWM.
    int result = (hwm_ + 1) / 2;
468

469
    return result;
470
}
471

472
void zmq::pipe_t::process_delimiter ()
473
{
474 475
    zmq_assert (state == active
            ||  state == waiting_for_delimiter);
476

477 478 479
    if (state == active)
        state = delimiter_received;
    else {
480
        outpipe = NULL;
481
        send_pipe_term_ack (peer);
482
        state = term_ack_sent;
483 484
    }
}
485 486 487 488 489 490 491 492 493 494 495 496

void zmq::pipe_t::hiccup ()
{
    //  If termination is already under way do nothing.
    if (state != active)
        return;

    //  We'll drop the pointer to the inpipe. From now on, the peer is
    //  responsible for deallocating it.
    inpipe = NULL;

    //  Create new inpipe.
497
    if (conflate)
498
        inpipe = new (std::nothrow)ypipe_conflate_t <msg_t>();
499
    else
500
        inpipe = new (std::nothrow)ypipe_t <msg_t, message_pipe_granularity>();
501

502 503 504 505 506 507 508
    alloc_assert (inpipe);
    in_active = true;

    //  Notify the peer about the hiccup.
    send_hiccup (peer, (void*) inpipe);
}

509 510
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
511 512 513 514 515
    int in = inhwm_ + inhwmboost;
    int out = outhwm_ + outhwmboost;

    // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
    if (inhwm_ <= 0 || inhwmboost <= 0)
516 517
        in = 0;

518
    if (outhwm_ <= 0 || outhwmboost <= 0)
519
        out = 0;
520

521 522
    lwm = compute_lwm(in);
    hwm = out;
523 524 525 526 527 528
}

void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
{
    inhwmboost = inhwmboost_;
    outhwmboost = outhwmboost_;
529
}
530

Martin Hurton's avatar
Martin Hurton committed
531
bool zmq::pipe_t::check_hwm () const
532
{
533
    bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
534 535
    return( !full );
}