stream_engine.hpp 6.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31
#ifndef __ZMQ_STREAM_ENGINE_HPP_INCLUDED__
#define __ZMQ_STREAM_ENGINE_HPP_INCLUDED__
32

33 34
#include <stddef.h>

35
#include "fd.hpp"
36
#include "i_engine.hpp"
37
#include "io_object.hpp"
38 39
#include "i_encoder.hpp"
#include "i_decoder.hpp"
40
#include "options.hpp"
41
#include "socket_base.hpp"
42
#include "metadata.hpp"
43 44 45

namespace zmq
{
46 47 48 49
    //  Protocol revisions
    enum
    {
        ZMTP_1_0 = 0,
50
        ZMTP_2_0 = 1
51
    };
52

53
    class io_thread_t;
54
    class msg_t;
55
    class session_base_t;
56
    class mechanism_t;
57

58 59 60
    //  This engine handles any socket with SOCK_STREAM semantics,
    //  e.g. TCP socket or an UNIX domain socket.

61
    class stream_engine_t : public io_object_t, public i_engine
62 63 64
    {
    public:

65 66 67
        enum error_reason_t {
            protocol_error,
            connection_error,
68
            timeout_error
69 70
        };

71
        stream_engine_t (fd_t fd_, const options_t &options_,
72
                         const std::string &endpoint);
73
        ~stream_engine_t ();
74

75
        //  i_engine interface implementation.
76 77
        void plug (zmq::io_thread_t *io_thread_,
           zmq::session_base_t *session_);
78
        void terminate ();
79 80
        void restart_input ();
        void restart_output ();
81
        void zap_msg_available ();
82 83 84 85

        //  i_poll_events interface implementation.
        void in_event ();
        void out_event ();
86
        void timer_event (int id_);
87 88

    private:
89 90 91
        //  Unplug the engine from the session.
        void unplug ();

92
        //  Function to handle network disconnections.
93
        void error (error_reason_t reason);
94

Martin Hurton's avatar
Martin Hurton committed
95 96 97 98 99 100
        //  Receives the greeting message from the peer.
        int receive_greeting ();

        //  Detects the protocol used by the peer.
        bool handshake ();

Martin Hurton's avatar
Martin Hurton committed
101 102
        int identity_msg (msg_t *msg_);
        int process_identity_msg (msg_t *msg_);
103

104 105
        int next_handshake_command (msg_t *msg);
        int process_handshake_command (msg_t *msg);
106

107 108 109
        int pull_msg_from_session (msg_t *msg_);
        int push_msg_to_session (msg_t *msg);

110 111
        int push_raw_msg_to_session (msg_t *msg);

112
        int write_credential (msg_t *msg_);
113 114 115 116
        int pull_and_encode (msg_t *msg_);
        int decode_and_push (msg_t *msg_);
        int push_one_then_decode_and_push (msg_t *msg_);

117
        void mechanism_ready ();
118 119 120 121

        size_t add_property (unsigned char *ptr,
            const char *name, const void *value, size_t value_len);

122 123
        void set_handshake_timer();

Thomas Rodgers's avatar
Thomas Rodgers committed
124 125 126
        typedef metadata_t::dict_t properties_t;
        bool init_properties (properties_t & properties);

Jonathan Reams's avatar
Jonathan Reams committed
127 128 129 130
        int produce_ping_message(msg_t * msg_);
        int process_heartbeat_message(msg_t * msg_);
        int produce_pong_message(msg_t * msg_);

131 132 133
        //  Underlying socket.
        fd_t s;

134 135 136
        //  True iff this is server's engine.
        bool as_server;

137
        msg_t tx_msg;
Martin Hurton's avatar
Martin Hurton committed
138

139 140
        handle_t handle;

141
        unsigned char *inpos;
142
        size_t insize;
143
        i_decoder *decoder;
144

145
        unsigned char *outpos;
146
        size_t outsize;
147
        i_encoder *encoder;
148

149 150 151
        //  Metadata to be attached to received messages. May be NULL.
        metadata_t *metadata;

Martin Hurton's avatar
Martin Hurton committed
152 153 154 155 156
        //  When true, we are still trying to determine whether
        //  the peer is using versioned protocol, and if so, which
        //  version.  When false, normal message flow has started.
        bool handshaking;

157 158 159 160 161 162 163 164 165 166
        static const size_t signature_size = 10;

        //  Size of ZMTP/1.0 and ZMTP/2.0 greeting message
        static const size_t v2_greeting_size = 12;

        //  Size of ZMTP/3.0 greeting message
        static const size_t v3_greeting_size = 64;

        //  Expected greeting size.
        size_t greeting_size;
Martin Hurton's avatar
Martin Hurton committed
167

Pieter Hintjens's avatar
Pieter Hintjens committed
168
        //  Greeting received from, and sent to peer
169 170
        unsigned char greeting_recv [v3_greeting_size];
        unsigned char greeting_send [v3_greeting_size];
Martin Hurton's avatar
Martin Hurton committed
171

Pieter Hintjens's avatar
Pieter Hintjens committed
172 173
        //  Size of greeting received so far
        unsigned int greeting_bytes_read;
Martin Hurton's avatar
Martin Hurton committed
174

175
        //  The session this engine is attached to.
176
        zmq::session_base_t *session;
177

178 179
        options_t options;

180
        // String representation of endpoint
181
        std::string endpoint;
182

183 184
        bool plugged;

Martin Hurton's avatar
Martin Hurton committed
185
        int (stream_engine_t::*next_msg) (msg_t *msg_);
186

Martin Hurton's avatar
Martin Hurton committed
187
        int (stream_engine_t::*process_msg) (msg_t *msg_);
188

189 190
        bool io_error;

191 192
        //  Indicates whether the engine is to inject a phantom
        //  subscription message into the incoming stream.
193 194 195
        //  Needed to support old peers.
        bool subscription_required;

196 197
        mechanism_t *mechanism;

198
        //  True iff the engine couldn't consume the last decoded message.
199
        bool input_stopped;
200 201

        //  True iff the engine doesn't have any message to encode.
202
        bool output_stopped;
203

204 205 206 207 208 209
        //  ID of the handshake timer
        enum {handshake_timer_id = 0x40};

        //  True is linger timer is running.
        bool has_handshake_timer;

Jonathan Reams's avatar
Jonathan Reams committed
210 211 212 213 214 215 216 217 218
        //  Heartbeat stuff
        enum {
            heartbeat_ivl_timer_id = 0x80,
            heartbeat_timeout_timer_id = 0x81,
            heartbeat_ttl_timer_id = 0x82
        };
        bool has_ttl_timer;
        bool has_timeout_timer;
        bool has_heartbeat_timer;
219
        int heartbeat_timeout;
Jonathan Reams's avatar
Jonathan Reams committed
220

221 222 223
        // Socket
        zmq::socket_base_t *socket;

224 225
        std::string peer_address;

226 227
        stream_engine_t (const stream_engine_t&);
        const stream_engine_t &operator = (const stream_engine_t&);
228 229 230 231 232
    };

}

#endif