pipe.hpp 9.11 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
30 31
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
#define __ZMQ_PIPE_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
32

33
#include "ypipe_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
34
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "object.hpp"
36
#include "stdint.hpp"
37
#include "array.hpp"
38
#include "blob.hpp"
39
#include "options.hpp"
40
#include "endpoint.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
41

Martin Sustrik's avatar
Martin Sustrik committed
42
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
43
{
44
class msg_t;
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
class pipe_t;

//  Create a pipepair for bi-directional transfer of messages.
//  First HWM is for messages passed from first pipe to the second pipe.
//  Second HWM is for messages passed from second pipe to the first pipe.
//  Delay specifies how the pipe behaves when the peer terminates. If true
//  pipe receives all the pending messages before terminating, otherwise it
//  terminates straight away.
//  If conflate is true, only the most recently arrived message could be
//  read (older messages are discarded)
int pipepair (zmq::object_t *parents_[2],
              zmq::pipe_t *pipes_[2],
              int hwms_[2],
              bool conflate_[2]);

struct i_pipe_events
{
62
    virtual ~i_pipe_events () ZMQ_DEFAULT;
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

    virtual void read_activated (zmq::pipe_t *pipe_) = 0;
    virtual void write_activated (zmq::pipe_t *pipe_) = 0;
    virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
    virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
};

//  Note that pipe can be stored in three different arrays.
//  The array of inbound pipes (1), the array of outbound pipes (2) and
//  the generic array of pipes to be deallocated (3).

class pipe_t : public object_t,
               public array_item_t<1>,
               public array_item_t<2>,
               public array_item_t<3>
{
    //  This allows pipepair to create pipe objects.
    friend int pipepair (zmq::object_t *parents_[2],
                         zmq::pipe_t *pipes_[2],
                         int hwms_[2],
                         bool conflate_[2]);

  public:
    //  Specifies the object to send events to.
    void set_event_sink (i_pipe_events *sink_);
Martin Sustrik's avatar
Martin Sustrik committed
88

89
    //  Pipe endpoint can store an routing ID to be used by its clients.
90
    void set_server_socket_routing_id (uint32_t server_socket_routing_id_);
91
    uint32_t get_server_socket_routing_id () const;
92

93
    //  Pipe endpoint can store an opaque ID to be used by its clients.
94
    void set_router_socket_routing_id (const blob_t &router_socket_routing_id_);
95
    const blob_t &get_routing_id () const;
96

97 98
    //  Returns true if there is at least one message to read in the pipe.
    bool check_read ();
99

100 101
    //  Reads a message to the underlying pipe.
    bool read (msg_t *msg_);
Martin Hurton's avatar
Martin Hurton committed
102

103 104 105 106
    //  Checks whether messages can be written to the pipe. If the pipe is
    //  closed or if writing the message would cause high watermark the
    //  function returns false.
    bool check_write ();
Martin Hurton's avatar
Martin Hurton committed
107

108 109 110 111 112 113
    //  Writes a message to the underlying pipe. Returns false if the
    //  message does not pass check_write. If false, the message object
    //  retains ownership of its message buffer.
    bool write (msg_t *msg_);

    //  Remove unfinished parts of the outbound message from the pipe.
114
    void rollback () const;
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144

    //  Flush the messages downstream.
    void flush ();

    //  Temporarily disconnects the inbound message stream and drops
    //  all the messages on the fly. Causes 'hiccuped' event to be generated
    //  in the peer.
    void hiccup ();

    //  Ensure the pipe won't block on receiving pipe_term.
    void set_nodelay ();

    //  Ask pipe to terminate. The termination will happen asynchronously
    //  and user will be notified about actual deallocation by 'terminated'
    //  event. If delay is true, the pending messages will be processed
    //  before actual shutdown.
    void terminate (bool delay_);

    //  Set the high water marks.
    void set_hwms (int inhwm_, int outhwm_);

    //  Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
    void set_hwms_boost (int inhwmboost_, int outhwmboost_);

    // send command to peer for notify the change of hwm
    void send_hwms_to_peer (int inhwm_, int outhwm_);

    //  Returns true if HWM is not reached
    bool check_hwm () const;

145 146
    void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
    const endpoint_uri_pair_t &get_endpoint_pair () const;
147

148 149
    void send_stats_to_peer (own_t *socket_base_);

150 151 152 153 154 155 156 157
  private:
    //  Type of the underlying lock-free pipe.
    typedef ypipe_base_t<msg_t> upipe_t;

    //  Command handlers.
    void process_activate_read ();
    void process_activate_write (uint64_t msgs_read_);
    void process_hiccup (void *pipe_);
158 159 160
    void process_pipe_peer_stats (uint64_t queue_count_,
                                  own_t *socket_base_,
                                  endpoint_uri_pair_t *endpoint_pair_);
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    void process_pipe_term ();
    void process_pipe_term_ack ();
    void process_pipe_hwm (int inhwm_, int outhwm_);

    //  Handler for delimiter read from the pipe.
    void process_delimiter ();

    //  Constructor is private. Pipe can only be created using
    //  pipepair function.
    pipe_t (object_t *parent_,
            upipe_t *inpipe_,
            upipe_t *outpipe_,
            int inhwm_,
            int outhwm_,
            bool conflate_);

    //  Pipepair uses this function to let us know about
    //  the peer pipe object.
179
    void set_peer (pipe_t *peer_);
180 181 182 183 184

    //  Destructor is private. Pipe objects destroy themselves.
    ~pipe_t ();

    //  Underlying pipes for both directions.
185 186
    upipe_t *_in_pipe;
    upipe_t *_out_pipe;
187 188

    //  Can the pipe be read from / written to?
189 190
    bool _in_active;
    bool _out_active;
191 192

    //  High watermark for the outbound pipe.
193
    int _hwm;
194 195

    //  Low watermark for the inbound pipe.
196
    int _lwm;
197 198

    // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
199 200
    int _in_hwm_boost;
    int _out_hwm_boost;
201 202

    //  Number of messages read and written so far.
203 204
    uint64_t _msgs_read;
    uint64_t _msgs_written;
205 206 207

    //  Last received peer's msgs_read. The actual number in the peer
    //  can be higher at the moment.
208
    uint64_t _peers_msgs_read;
209 210

    //  The pipe object on the other side of the pipepair.
211
    pipe_t *_peer;
212 213

    //  Sink to send events to.
214
    i_pipe_events *_sink;
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234

    //  States of the pipe endpoint:
    //  active: common state before any termination begins,
    //  delimiter_received: delimiter was read from pipe before
    //      term command was received,
    //  waiting_for_delimiter: term command was already received
    //      from the peer but there are still pending messages to read,
    //  term_ack_sent: all pending messages were already read and
    //      all we are waiting for is ack from the peer,
    //  term_req_sent1: 'terminate' was explicitly called by the user,
    //  term_req_sent2: user called 'terminate' and then we've got
    //      term command from the peer as well.
    enum
    {
        active,
        delimiter_received,
        waiting_for_delimiter,
        term_ack_sent,
        term_req_sent1,
        term_req_sent2
235
    } _state;
Martin Sustrik's avatar
Martin Sustrik committed
236

237 238 239
    //  If true, we receive all the pending inbound messages before
    //  terminating. If false, we terminate immediately when the peer
    //  asks us to.
240
    bool _delay;
Martin Sustrik's avatar
Martin Sustrik committed
241

242
    //  Routing id of the writer. Used uniquely by the reader side.
243
    blob_t _router_socket_routing_id;
244

245
    //  Routing id of the writer. Used uniquely by the reader side.
246
    int _server_socket_routing_id;
Martin Sustrik's avatar
Martin Sustrik committed
247

248 249
    //  Returns true if the message is delimiter; false otherwise.
    static bool is_delimiter (const msg_t &msg_);
250

251 252
    //  Computes appropriate low watermark from the given high watermark.
    static int compute_lwm (int hwm_);
Martin Hurton's avatar
Martin Hurton committed
253

254
    const bool _conflate;
Martin Sustrik's avatar
Martin Sustrik committed
255

256 257
    // The endpoints of this pipe.
    endpoint_uri_pair_t _endpoint_pair;
258

259
    ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t)
260
};
261 262

void send_routing_id (pipe_t *pipe_, const options_t &options_);
Martin Sustrik's avatar
Martin Sustrik committed
263 264 265
}

#endif