socket_base.hpp 8.15 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4
    Copyright (c) 2011 VMware, Inc.
5
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6 7 8 9

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
10
    the terms of the GNU Lesser General Public License as published by
11 12 13 14 15 16
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
    GNU Lesser General Public License for more details.
18

19
    You should have received a copy of the GNU Lesser General Public License
20 21 22 23 24 25
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__

26
#include <string>
27
#include <map>
28
#include <stdarg.h>
29

30
#include "own.hpp"
31
#include "array.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
32
#include "stdint.hpp"
33
#include "poller.hpp"
34
#include "atomic_counter.hpp"
35
#include "i_poll_events.hpp"
36
#include "mailbox.hpp"
37
#include "stdint.hpp"
38
#include "clock.hpp"
39
#include "pipe.hpp"
40 41 42 43

namespace zmq
{

44 45 46 47
    class ctx_t;
    class msg_t;
    class pipe_t;

48
    class socket_base_t :
49
        public own_t,
50
        public array_item_t <>,
51 52
        public i_poll_events,
        public i_pipe_events
53
    {
54 55
        friend class reaper_t;

56 57
    public:

58 59 60
        //  Returns false if object is not a socket.
        bool check_tag ();

61
        //  Create a socket of a specified type.
62
        static socket_base_t *create (int type_, zmq::ctx_t *parent_,
63
            uint32_t tid_, int sid_);
64

65 66
        //  Returns the mailbox associated with this socket.
        mailbox_t *get_mailbox ();
67 68 69 70

        //  Interrupt blocking call if the socket is stuck in one.
        //  This function can be called from a different thread!
        void stop ();
71

72
        //  Interface for communication with the API layer.
73 74
        int setsockopt (int option_, const void *optval_, size_t optvallen_);
        int getsockopt (int option_, void *optval_, size_t *optvallen_);
75 76
        int bind (const char *addr_);
        int connect (const char *addr_);
77
        int term_endpoint (const char *addr_);
78 79
        int send (zmq::msg_t *msg_, int flags_);
        int recv (zmq::msg_t *msg_, int flags_);
80
        int close ();
81

82 83 84 85 86
        //  These functions are used by the polling mechanism to determine
        //  which events are to be reported from this socket.
        bool has_in ();
        bool has_out ();

87 88 89 90 91 92 93 94 95
        //  Using this function reaper thread ask the socket to regiter with
        //  its poller.
        void start_reaping (poller_t *poller_);

        //  i_poll_events implementation. This interface is used when socket
        //  is handled by the poller in the reaper thread.
        void in_event ();
        void out_event ();
        void timer_event (int id_);
Martin Sustrik's avatar
Martin Sustrik committed
96

97 98 99
        //  i_pipe_events interface implementation.
        void read_activated (pipe_t *pipe_);
        void write_activated (pipe_t *pipe_);
100
        void hiccuped (pipe_t *pipe_);
101
        void terminated (pipe_t *pipe_);
skaller's avatar
skaller committed
102 103
        void lock();
        void unlock();
104

105 106 107 108 109 110 111 112 113 114 115 116
        int monitor(const char *endpoint_, int events_);

        void event_connected(const char *addr_, int fd_);
        void event_connect_delayed(const char *addr_, int err_);
        void event_connect_retried(const char *addr_, int interval_);
        void event_listening(const char *addr_, int fd_);
        void event_bind_failed(const char *addr_, int err_);
        void event_accepted(const char *addr_, int fd_);
        void event_accept_failed(const char *addr_, int err_);
        void event_closed(const char *addr_, int fd_);        
        void event_close_failed(const char *addr_, int fd_);  
        void event_disconnected(const char *addr_, int fd_); 
117

118 119
    protected:

120
        socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
121 122
        virtual ~socket_base_t ();

123 124
        //  Concrete algorithms for the x- methods are to be defined by
        //  individual socket types.
125 126
        virtual void xattach_pipe (zmq::pipe_t *pipe_,
            bool icanhasall_ = false) = 0;
127

128 129 130
        //  The default implementation assumes there are no specific socket
        //  options for the particular socket type. If not so, overload this
        //  method.
131
        virtual int xsetsockopt (int option_, const void *optval_,
132 133 134 135
            size_t optvallen_);

        //  The default implementation assumes that send is not supported.
        virtual bool xhas_out ();
136
        virtual int xsend (zmq::msg_t *msg_, int flags_);
137 138 139

        //  The default implementation assumes that recv in not supported.
        virtual bool xhas_in ();
140
        virtual int xrecv (zmq::msg_t *msg_, int flags_);
Martin Sustrik's avatar
Martin Sustrik committed
141

142 143 144
        //  i_pipe_events will be forwarded to these functions.
        virtual void xread_activated (pipe_t *pipe_);
        virtual void xwrite_activated (pipe_t *pipe_);
145
        virtual void xhiccuped (pipe_t *pipe_);
146
        virtual void xterminated (pipe_t *pipe_) = 0;
147

148 149 150
        //  Delay actual destruction of the socket.
        void process_destroy ();

151 152 153 154 155 156
        // Socket event data dispath
        void monitor_event (zmq_event_t data_);

        // Monitor socket cleanup
        void stop_monitor ();

157
    private:
158 159 160 161 162 163
        //  Creates new endpoint ID and adds the endpoint to the map.
        void add_endpoint (const char *addr_, own_t *endpoint_);

        //  Map of open endpoints.
        typedef std::multimap <std::string, own_t *> endpoints_t;
        endpoints_t endpoints;
164

165 166 167 168
        //  To be called after processing commands or invoking any command
        //  handlers explicitly. If required, it will deallocate the socket.
        void check_destroy ();

169 170 171 172
        //  Moves the flags from the message to local variables,
        //  to be later retrieved by getsockopt.
        void extract_flags (msg_t *msg_);

173 174 175
        //  Used to check whether the object is a socket.
        uint32_t tag;

176 177
        //  If true, associated context was already terminated.
        bool ctx_terminated;
178

179 180 181 182 183
        //  If true, object should have been already destroyed. However,
        //  destruction is delayed while we unwind the stack to the point
        //  where it doesn't intersect the object being destroyed.
        bool destroyed;

184 185 186 187
        //  Parse URI string.
        int parse_uri (const char *uri_, std::string &protocol_,
            std::string &address_);

188 189 190
        //  Check whether transport protocol, as specified in connect or
        //  bind, is available and compatible with the socket type.
        int check_protocol (const std::string &protocol_);
191

192
        //  Register the pipe with this socket.
193
        void attach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false);
194

195 196
        //  Processes commands sent to this socket (if any). If timeout is -1,
        //  returns only after at least one command was processed.
197 198
        //  If throttle argument is true, commands are processed at most once
        //  in a predefined time period.
199
        int process_commands (int timeout_, bool throttle_);
200

201
        //  Handlers for incoming commands.
202
        void process_stop ();
203
        void process_bind (zmq::pipe_t *pipe_);
204
        void process_term (int linger_);
205

206 207
        //  Socket's mailbox object.
        mailbox_t mailbox;
208

209 210 211 212
        //  List of attached pipes.
        typedef array_t <pipe_t, 3> pipes_t;
        pipes_t pipes;

213 214 215 216
        //  Reaper's poller and handle of this socket within it.
        poller_t *poller;
        poller_t::handle_t handle;

217
        //  Timestamp of when commands were processed the last time.
Martin Sustrik's avatar
Martin Sustrik committed
218
        uint64_t last_tsc;
219

Martin Sustrik's avatar
Martin Sustrik committed
220 221 222
        //  Number of messages received since last command processing.
        int ticks;

223
        //  True if the last message received had MORE flag set.
224 225
        bool rcvmore;

226 227 228
        //  Improves efficiency of time measurement.
        clock_t clock;

229 230 231 232 233 234
        // Monitor socket;
        void *monitor_socket;

        // Bitmask of events being monitored
        int monitor_events;

235
        socket_base_t (const socket_base_t&);
236
        const socket_base_t &operator = (const socket_base_t&);
skaller's avatar
skaller committed
237
        mutex_t sync;
238 239 240 241 242
    };

}

#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
243