pipe.hpp 9.02 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
30 31
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
#define __ZMQ_PIPE_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
32

33
#include "msg.hpp"
34
#include "ypipe_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "config.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "object.hpp"
37
#include "stdint.hpp"
38
#include "array.hpp"
39
#include "blob.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40

Martin Sustrik's avatar
Martin Sustrik committed
41
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
42 43
{

44 45 46
    class object_t;
    class pipe_t;

47 48 49
    //  Create a pipepair for bi-directional transfer of messages.
    //  First HWM is for messages passed from first pipe to the second pipe.
    //  Second HWM is for messages passed from second pipe to the first pipe.
50 51 52
    //  Delay specifies how the pipe behaves when the peer terminates. If true
    //  pipe receives all the pending messages before terminating, otherwise it
    //  terminates straight away.
53 54
    //  If conflate is true, only the most recently arrived message could be
    //  read (older messages are discarded)
55
    int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
Ian Barber's avatar
Ian Barber committed
56
        int hwms_ [2], bool conflate_ [2]);
57

58
    struct i_pipe_events
59
    {
60
        virtual ~i_pipe_events () {}
61

62 63 64
        virtual void read_activated (zmq::pipe_t *pipe_) = 0;
        virtual void write_activated (zmq::pipe_t *pipe_) = 0;
        virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
65
        virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
66 67
    };

68 69
    //  Note that pipe can be stored in three different arrays.
    //  The array of inbound pipes (1), the array of outbound pipes (2) and
70
    //  the generic array of pipes to be deallocated (3).
71

72 73
    class pipe_t :
        public object_t,
74 75 76
        public array_item_t <1>,
        public array_item_t <2>,
        public array_item_t <3>
Martin Sustrik's avatar
Martin Sustrik committed
77
    {
78
        //  This allows pipepair to create pipe objects.
Ian Barber's avatar
Ian Barber committed
79 80
        friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
            int hwms_ [2], bool conflate_ [2]);
Martin Hurton's avatar
Martin Hurton committed
81

82
    public:
Martin Sustrik's avatar
Martin Sustrik committed
83

84 85
        //  Specifies the object to send events to.
        void set_event_sink (i_pipe_events *sink_);
Martin Sustrik's avatar
Martin Sustrik committed
86

87
        //  Pipe endpoint can store an routing ID to be used by its clients.
88 89
        void set_routing_id (uint32_t routing_id_);
        uint32_t get_routing_id ();
90

91
        //  Pipe endpoint can store an opaque ID to be used by its clients.
92 93
        void set_identity (const blob_t &identity_);
        blob_t get_identity ();
94

95 96
        blob_t get_credential () const;

97 98 99
        //  Returns true if there is at least one message to read in the pipe.
        bool check_read ();

Martin Sustrik's avatar
Martin Sustrik committed
100
        //  Reads a message to the underlying pipe.
101
        bool read (msg_t *msg_);
Martin Sustrik's avatar
Martin Sustrik committed
102

103 104 105
        //  Checks whether messages can be written to the pipe. If the pipe is
        //  closed or if writing the message would cause high watermark the
        //  function returns false.
106
        bool check_write ();
Martin Sustrik's avatar
Martin Sustrik committed
107 108

        //  Writes a message to the underlying pipe. Returns false if the
109 110
        //  message does not pass check_write. If false, the message object
        //  retains ownership of its message buffer.
111
        bool write (msg_t *msg_);
Martin Sustrik's avatar
Martin Sustrik committed
112

113
        //  Remove unfinished parts of the outbound message from the pipe.
114 115
        void rollback ();

Sergey M․'s avatar
Sergey M․ committed
116
        //  Flush the messages downstream.
Martin Sustrik's avatar
Martin Sustrik committed
117 118
        void flush ();

Sergey M․'s avatar
Sergey M․ committed
119
        //  Temporarily disconnects the inbound message stream and drops
120 121 122
        //  all the messages on the fly. Causes 'hiccuped' event to be generated
        //  in the peer.
        void hiccup ();
Martin Hurton's avatar
Martin Hurton committed
123

124
        //  Ensure the pipe won't block on receiving pipe_term.
Ian Barber's avatar
Ian Barber committed
125
        void set_nodelay ();
126

127 128
        //  Ask pipe to terminate. The termination will happen asynchronously
        //  and user will be notified about actual deallocation by 'terminated'
129 130 131
        //  event. If delay is true, the pending messages will be processed
        //  before actual shutdown.
        void terminate (bool delay_);
Martin Sustrik's avatar
Martin Sustrik committed
132

133
        //  Set the high water marks.
134 135
        void set_hwms (int inhwm_, int outhwm_);

136
        //  Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
137 138
        void set_hwms_boost(int inhwmboost_, int outhwmboost_);

139 140 141
        // send command to peer for notify the change of hwm
        void send_hwms_to_peer(int inhwm_, int outhwm_);

142
        //  Returns true if HWM is not reached
Martin Hurton's avatar
Martin Hurton committed
143
        bool check_hwm () const;
Martin Sustrik's avatar
Martin Sustrik committed
144 145
    private:

146
        //  Type of the underlying lock-free pipe.
147
        typedef ypipe_base_t <msg_t> upipe_t;
148

Martin Sustrik's avatar
Martin Sustrik committed
149
        //  Command handlers.
150 151
        void process_activate_read ();
        void process_activate_write (uint64_t msgs_read_);
152
        void process_hiccup (void *pipe_);
Martin Sustrik's avatar
Martin Sustrik committed
153
        void process_pipe_term ();
154
        void process_pipe_term_ack ();
155
        void process_pipe_hwm (int inhwm_, int outhwm_);
156

157
        //  Handler for delimiter read from the pipe.
158
        void process_delimiter ();
159

160 161 162
        //  Constructor is private. Pipe can only be created using
        //  pipepair function.
        pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
Ian Barber's avatar
Ian Barber committed
163
            int inhwm_, int outhwm_, bool conflate_);
Martin Hurton's avatar
Martin Hurton committed
164

165 166 167
        //  Pipepair uses this function to let us know about
        //  the peer pipe object.
        void set_peer (pipe_t *pipe_);
Martin Hurton's avatar
Martin Hurton committed
168

169 170
        //  Destructor is private. Pipe objects destroy themselves.
        ~pipe_t ();
Martin Sustrik's avatar
Martin Sustrik committed
171

172 173 174
        //  Underlying pipes for both directions.
        upipe_t *inpipe;
        upipe_t *outpipe;
Martin Sustrik's avatar
Martin Sustrik committed
175

176 177 178 179 180
        //  Can the pipe be read from / written to?
        bool in_active;
        bool out_active;

        //  High watermark for the outbound pipe.
181
        int hwm;
Martin Sustrik's avatar
Martin Sustrik committed
182

183 184
        //  Low watermark for the inbound pipe.
        int lwm;
Martin Hurton's avatar
Martin Hurton committed
185

186 187 188 189
        // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
        int inhwmboost;
        int outhwmboost;

190 191
        //  Number of messages read and written so far.
        uint64_t msgs_read;
Martin Hurton's avatar
Martin Hurton committed
192 193
        uint64_t msgs_written;

194 195 196
        //  Last received peer's msgs_read. The actual number in the peer
        //  can be higher at the moment.
        uint64_t peers_msgs_read;
Martin Hurton's avatar
Martin Hurton committed
197

198 199 200 201 202 203
        //  The pipe object on the other side of the pipepair.
        pipe_t *peer;

        //  Sink to send events to.
        i_pipe_events *sink;

204 205 206 207
        //  States of the pipe endpoint:
        //  active: common state before any termination begins,
        //  delimiter_received: delimiter was read from pipe before
        //      term command was received,
Sergey M․'s avatar
Sergey M․ committed
208
        //  waiting_for_delimiter: term command was already received
209 210 211 212 213 214
        //      from the peer but there are still pending messages to read,
        //  term_ack_sent: all pending messages were already read and
        //      all we are waiting for is ack from the peer,
        //  term_req_sent1: 'terminate' was explicitly called by the user,
        //  term_req_sent2: user called 'terminate' and then we've got
        //      term command from the peer as well.
215 216
        enum {
            active,
217 218 219 220 221
            delimiter_received,
            waiting_for_delimiter,
            term_ack_sent,
            term_req_sent1,
            term_req_sent2
222
        } state;
223 224 225 226 227 228

        //  If true, we receive all the pending inbound messages before
        //  terminating. If false, we terminate immediately when the peer
        //  asks us to.
        bool delay;

229 230
        //  Identity of the writer. Used uniquely by the reader side.
        blob_t identity;
231

232 233 234
        //  Identity of the writer. Used uniquely by the reader side.
        int routing_id;

235 236 237
        //  Pipe's credential.
        blob_t credential;

238
        //  Returns true if the message is delimiter; false otherwise.
239
        static bool is_delimiter (const msg_t &msg_);
240 241 242 243

        //  Computes appropriate low watermark from the given high watermark.
        static int compute_lwm (int hwm_);

Martin Hurton's avatar
Martin Hurton committed
244
        const bool conflate;
245

246 247 248
        //  Disable copying.
        pipe_t (const pipe_t&);
        const pipe_t &operator = (const pipe_t&);
Martin Sustrik's avatar
Martin Sustrik committed
249 250
    };

Martin Sustrik's avatar
Martin Sustrik committed
251 252 253
}

#endif