ctx.hpp 6.85 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22 23

#include <map>
24
#include <vector>
Martin Sustrik's avatar
Martin Sustrik committed
25
#include <string>
26
#include <stdarg.h>
Martin Sustrik's avatar
Martin Sustrik committed
27

28
#include "mailbox.hpp"
29
#include "array.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
30 31 32
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
33
#include "options.hpp"
34
#include "atomic_counter.hpp"
35
#include "thread.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36

Martin Sustrik's avatar
Martin Sustrik committed
37
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
38
{
39 40 41 42 43

    class object_t;
    class io_thread_t;
    class socket_base_t;
    class reaper_t;
44
    class pipe_t;
45

46 47 48 49 50
    //  Information associated with inproc endpoint. Note that endpoint options
    //  are registered as well so that the peer can access them without a need
    //  for synchronisation, handshaking or similar.
    struct endpoint_t
    {
Martin Hurton's avatar
Martin Hurton committed
51
        socket_base_t *socket;
52 53
        options_t options;
    };
54 55 56

    //  Context object encapsulates all the global state associated with
    //  the library.
Martin Hurton's avatar
Martin Hurton committed
57

58
    class ctx_t
Martin Sustrik's avatar
Martin Sustrik committed
59 60 61
    {
    public:

Martin Hurton's avatar
Martin Hurton committed
62
        //  Create the context object.
63
        ctx_t ();
Martin Sustrik's avatar
Martin Sustrik committed
64

65 66 67
        //  Returns false if object is not a context.
        bool check_tag ();

68 69 70 71
        //  This function is called when user invokes zmq_term. If there are
        //  no more sockets open it'll cause all the infrastructure to be shut
        //  down. If there are open sockets still, the deallocation happens
        //  after the last one is closed.
72
        int terminate ();
Martin Sustrik's avatar
Martin Sustrik committed
73

74 75 76 77 78 79 80 81 82
        // This function starts the terminate process by unblocking any blocking
        // operations currently in progress and stopping any more socket activity
        // (except zmq_close).
        // This function is non-blocking.
        // terminate must still be called afterwards.
        // This function is optional, terminate will unblock any current 
        // operations as well.
        int shutdown();

Martin Hurton's avatar
Martin Hurton committed
83
        //  Set and get context properties.
84 85
        int set (int option_, int optval_);
        int get (int option_);
Martin Hurton's avatar
Martin Hurton committed
86

87
        //  Create and destroy a socket.
88 89
        zmq::socket_base_t *create_socket (int type_);
        void destroy_socket (zmq::socket_base_t *socket_);
90

91 92 93
		//  Start a new thread with proper scheduling parameters.
        void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;

Martin Sustrik's avatar
Martin Sustrik committed
94 95
        //  Send command to the destination thread.
        void send_command (uint32_t tid_, const command_t &command_);
96

Martin Sustrik's avatar
Martin Sustrik committed
97
        //  Returns the I/O thread that is the least busy at the moment.
98
        //  Affinity specifies which I/O threads are eligible (0 = all).
Martin Hurton's avatar
Martin Hurton committed
99
        //  Returns NULL if no I/O thread is available.
100
        zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
101

102
        //  Returns reaper thread object.
103
        zmq::object_t *get_reaper ();
104

105
        //  Management of inproc endpoints.
106
        int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
Martin Hurton's avatar
Martin Hurton committed
107
        int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
108
        void unregister_endpoints (zmq::socket_base_t *socket_);
109
        endpoint_t find_endpoint (const char *addr_);
Martin Hurton's avatar
Martin Hurton committed
110 111
        void pend_connection (const std::string &addr_,
                const endpoint_t &endpoint_, pipe_t **pipes_);
112
        void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
113

114 115 116 117 118
        enum {
            term_tid = 0,
            reaper_tid = 1
        };

119
        ~ctx_t ();
120

Martin Sustrik's avatar
Martin Sustrik committed
121 122
    private:

Martin Hurton's avatar
Martin Hurton committed
123 124 125 126 127 128
        struct pending_connection_t
        {
            endpoint_t endpoint;
            pipe_t* connect_pipe;
            pipe_t* bind_pipe;
        };
129

130 131 132
        //  Used to check whether the object is a context.
        uint32_t tag;

133 134 135
        //  Sockets belonging to this context. We need the list so that
        //  we can notify the sockets when zmq_term() is called. The sockets
        //  will return ETERM then.
136
        typedef array_t <socket_base_t> sockets_t;
137 138
        sockets_t sockets;

Martin Sustrik's avatar
Martin Sustrik committed
139
        //  List of unused thread slots.
140 141
        typedef std::vector <uint32_t> empty_slots_t;
        empty_slots_t empty_slots;
142

Martin Hurton's avatar
Martin Hurton committed
143 144
        //  If true, zmq_init has been called but no socket has been created
        //  yet. Launching of I/O threads is delayed.
145 146
        bool starting;

147 148
        //  If true, zmq_term was already called.
        bool terminating;
Martin Sustrik's avatar
Martin Sustrik committed
149

150
        //  Synchronisation of accesses to global slot-related data:
151
        //  sockets, empty_slots, terminating. It also synchronises
152
        //  access to zombie sockets as such (as opposed to slots) and provides
153 154
        //  a memory barrier to ensure that all CPU cores see the same data.
        mutex_t slot_sync;
Martin Sustrik's avatar
Martin Sustrik committed
155

156
        //  The reaper thread.
157
        zmq::reaper_t *reaper;
158

Martin Sustrik's avatar
Martin Sustrik committed
159
        //  I/O threads.
160
        typedef std::vector <zmq::io_thread_t*> io_threads_t;
Martin Sustrik's avatar
Martin Sustrik committed
161 162
        io_threads_t io_threads;

163
        //  Array of pointers to mailboxes for both application and I/O threads.
164
        uint32_t slot_count;
165
        mailbox_t **slots;
166

167 168 169
        //  Mailbox for zmq_term thread.
        mailbox_t term_mailbox;

170
        //  List of inproc endpoints within this context.
171
        typedef std::map <std::string, endpoint_t> endpoints_t;
172 173
        endpoints_t endpoints;

174
        // List of inproc connection endpoints pending a bind
175
        typedef std::multimap <std::string, pending_connection_t> pending_connections_t;
176 177
        pending_connections_t pending_connections;

178 179 180
        //  Synchronisation of access to the list of inproc endpoints.
        mutex_t endpoints_sync;

181 182 183 184 185 186 187 188 189
        //  Maximum socket ID.
        static atomic_counter_t max_socket_id;

        //  Maximum number of sockets that can be opened at the same time.
        int max_sockets;

        //  Number of I/O threads to launch.
        int io_thread_count;

Pieter Hintjens's avatar
Pieter Hintjens committed
190 191 192
        //  Is IPv6 enabled on this context?
        bool ipv6;

193 194 195 196
		//  Thread scheduling parameters.
        int thread_priority;
        int thread_sched_policy;

197 198
        //  Synchronisation of access to context options.
        mutex_t opt_sync;
Martin Hurton's avatar
Martin Hurton committed
199

200
        ctx_t (const ctx_t&);
201
        const ctx_t &operator = (const ctx_t&);
202 203 204 205 206

#ifdef HAVE_FORK
        // the process that created this context. Used to detect forking.
        pid_t pid;
#endif
207
        enum side { connect_side, bind_side };
Martin Hurton's avatar
Martin Hurton committed
208
        void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, const pending_connection_t &pending_connection_, side side_);
Martin Sustrik's avatar
Martin Sustrik committed
209
    };
Martin Hurton's avatar
Martin Hurton committed
210

Martin Sustrik's avatar
Martin Sustrik committed
211 212 213
}

#endif