dispatcher.hpp 6.14 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
#define __ZMQ_DISPATCHER_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22 23

#include <vector>
Martin Sustrik's avatar
Martin Sustrik committed
24
#include <set>
Martin Sustrik's avatar
Martin Sustrik committed
25 26 27 28 29 30 31 32 33
#include <map>
#include <string>

#include "i_signaler.hpp"
#include "ypipe.hpp"
#include "command.hpp"
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
34
#include "thread.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35

Martin Sustrik's avatar
Martin Sustrik committed
36
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
37 38 39 40 41
{

    //  Dispatcher implements bidirectional thread-safe passing of commands
    //  between N threads. It consists of a ypipes to pass commands and
    //  signalers to wake up the receiver thread when new commands are
42
    //  available. Note that dispatcher is inefficient for passing messages
Martin Sustrik's avatar
Martin Sustrik committed
43 44 45 46
    //  within a thread (sender thread = receiver thread). The optimisation is
    //  not part of the class and should be implemented by individual threads
    //  (presumably by calling the command handling function directly).
    
47
    class dispatcher_t
Martin Sustrik's avatar
Martin Sustrik committed
48 49 50
    {
    public:

51
        //  Create the dispatcher object. Matrix of pipes to communicate between
Martin Sustrik's avatar
Martin Sustrik committed
52 53
        //  each socket and each I/O thread is created along with appropriate
        //  signalers.
54
        dispatcher_t (int app_threads_, int io_threads_, int flags_);
Martin Sustrik's avatar
Martin Sustrik committed
55

56 57 58 59 60
        //  This function is called when user invokes zmq_term. If there are
        //  no more sockets open it'll cause all the infrastructure to be shut
        //  down. If there are open sockets still, the deallocation happens
        //  after the last one is closed.
        int term ();
Martin Sustrik's avatar
Martin Sustrik committed
61

62
        //  Create a socket.
63
        class socket_base_t *create_socket (int type_);
Martin Sustrik's avatar
Martin Sustrik committed
64

65 66 67
        //  Destroy a socket.
        void destroy_socket ();

68 69 70 71
        //  Called by app_thread_t when it has no more sockets. The function
        //  should disassociate the object from the current OS thread.
        void no_sockets (class app_thread_t *thread_);

72
        //  Returns number of thread slots in the dispatcher. To be used by
Martin Sustrik's avatar
Martin Sustrik committed
73 74 75 76
        //  individual threads to find out how many distinct signals can be
        //  received.
        int thread_slot_count ();

77
        //  Send command from the source to the destination.
78
        void write (int source_, int destination_, const command_t &command_);
Martin Sustrik's avatar
Martin Sustrik committed
79

80
        //  Receive command from the source. Returns false if there is no
Martin Sustrik's avatar
Martin Sustrik committed
81
        //  command available.
82
        bool read (int source_,  int destination_, command_t *command_);
Martin Sustrik's avatar
Martin Sustrik committed
83 84 85 86 87

        //  Returns the I/O thread that is the least busy at the moment.
        //  Taskset specifies which I/O threads are eligible (0 = all).
        class io_thread_t *choose_io_thread (uint64_t taskset_);

Martin Sustrik's avatar
Martin Sustrik committed
88 89 90 91 92
        //  All pipes are registered with the dispatcher so that even the
        //  orphaned pipes can be deallocated on the terminal shutdown.
        void register_pipe (class pipe_t *pipe_);
        void unregister_pipe (class pipe_t *pipe_);

93 94 95 96 97
        //  Management of inproc endpoints.
        int register_endpoint (const char *addr_, class socket_base_t *socket_);
        void unregister_endpoints (class socket_base_t *socket_);
        class socket_base_t *find_endpoint (const char *addr_);

Martin Sustrik's avatar
Martin Sustrik committed
98 99
    private:

100 101
        ~dispatcher_t ();

102 103 104 105 106 107 108 109 110 111 112 113 114
        struct app_thread_info_t
        {
            //  If false, 0MQ application thread is free, there's no associated
            //  OS thread.
            bool associated;

            //  ID of the associated OS thread. If 'associated' is false,
            //  this field contains bogus data.
            thread_t::id_t tid;

            //  Pointer to the 0MQ application thread object.
            class app_thread_t *app_thread;
        };
Martin Sustrik's avatar
Martin Sustrik committed
115 116

        //  Application threads.
117
        typedef std::vector <app_thread_info_t> app_threads_t;
Martin Sustrik's avatar
Martin Sustrik committed
118 119
        app_threads_t app_threads;

120 121 122
        //  Synchronisation of accesses to shared application thread data.
        mutex_t app_threads_sync;

Martin Sustrik's avatar
Martin Sustrik committed
123 124 125 126 127 128 129 130 131 132 133 134 135 136
        //  I/O threads.
        typedef std::vector <class io_thread_t*> io_threads_t;
        io_threads_t io_threads;

        //  Signalers for both application and I/O threads.
        std::vector <i_signaler*> signalers;

        //  Pipe to hold the commands.
        typedef ypipe_t <command_t, true,
            command_pipe_granularity> command_pipe_t;

        //  NxN matrix of command pipes.
        command_pipe_t *command_pipes;

Martin Sustrik's avatar
Martin Sustrik committed
137 138 139 140 141 142 143 144 145
        //  As pipes may reside in orphaned state in particular moments
        //  of the pipe shutdown process, i.e. neither pipe reader nor
        //  pipe writer hold reference to the pipe, we have to hold references
        //  to all pipes in dispatcher so that we can deallocate them
        //  during terminal shutdown even though it conincides with the
        //  pipe being in the orphaned state.
        typedef std::set <class pipe_t*> pipes_t;
        pipes_t pipes;

146
        //  Synchronisation of access to the pipes repository.
Martin Sustrik's avatar
Martin Sustrik committed
147 148
        mutex_t pipes_sync;

149 150 151 152 153 154 155 156 157 158 159
        //  Number of sockets alive.
        int sockets;

        //  If true, zmq_term was already called. When last socket is closed
        //  the whole 0MQ infrastructure should be deallocated.
        bool terminated;

        //  Synchronisation of access to the termination data (socket count
        //  and 'terminated' flag).
        mutex_t term_sync;

160 161 162 163 164 165 166
        //  List of inproc endpoints within this context.
        typedef std::map <std::string, class socket_base_t*> endpoints_t;
        endpoints_t endpoints;

        //  Synchronisation of access to the list of inproc endpoints.
        mutex_t endpoints_sync;

167 168
        dispatcher_t (const dispatcher_t&);
        void operator = (const dispatcher_t&);
Martin Sustrik's avatar
Martin Sustrik committed
169 170 171 172 173 174
    };
    
}

#endif