dispatcher.cpp 9.08 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <new>

22
#include "../bindings/c/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
23

24
#include "dispatcher.hpp"
25
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
26 27 28 29 30 31
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "pipe.hpp"

Martin Sustrik's avatar
Martin Sustrik committed
32
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
33 34 35
#include "windows.h"
#endif

36 37
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
      int flags_) :
38 39
    sockets (0),
    terminated (false)
Martin Sustrik's avatar
Martin Sustrik committed
40
{
Martin Sustrik's avatar
Martin Sustrik committed
41
#ifdef ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
42 43 44 45 46
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.
    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
Martin Sustrik's avatar
Martin Sustrik committed
47 48
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
Martin Sustrik's avatar
Martin Sustrik committed
49 50 51 52 53
        HIBYTE (wsa_data.wVersion) == 2);
#endif

    //  Create application thread proxies.
    for (int i = 0; i != app_threads_; i++) {
54 55 56 57 58 59
        app_thread_info_t info;
        info.associated = false;
        info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
        zmq_assert (info.app_thread);
        app_threads.push_back (info);
        signalers.push_back (info.app_thread->get_signaler ());
Martin Sustrik's avatar
Martin Sustrik committed
60 61 62 63
    }

    //  Create I/O thread objects.
    for (int i = 0; i != io_threads_; i++) {
64 65
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
            i + app_threads_, flags_);
Martin Sustrik's avatar
Martin Sustrik committed
66
        zmq_assert (io_thread);
Martin Sustrik's avatar
Martin Sustrik committed
67 68 69 70
        io_threads.push_back (io_thread);
        signalers.push_back (io_thread->get_signaler ());
    }

71 72 73 74 75 76
    //  Create the administrative thread. Nothing special is needed. NULL
    //  is used instead of signaler given that as for now, administrative
    //  thread doesn't receive any commands. The only thing it is used for
    //  is sending 'stop' command to I/O threads on shutdown.
    signalers.push_back (NULL);

Martin Sustrik's avatar
Martin Sustrik committed
77
    //  Create command pipe matrix.
78 79
    command_pipes = new  (std::nothrow) command_pipe_t [signalers.size () *
         signalers.size ()];
Martin Sustrik's avatar
Martin Sustrik committed
80
    zmq_assert (command_pipes);
Martin Sustrik's avatar
Martin Sustrik committed
81 82 83 84 85 86

    //  Launch I/O threads.
    for (int i = 0; i != io_threads_; i++)
        io_threads [i]->start ();
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100
int zmq::dispatcher_t::term ()
{
    term_sync.lock ();
    zmq_assert (!terminated);
    terminated = true;
    bool destroy = (sockets == 0);
    term_sync.unlock ();
    
    if (destroy)
        delete this;

    return 0;
}

101
zmq::dispatcher_t::~dispatcher_t ()
Martin Sustrik's avatar
Martin Sustrik committed
102
{
103 104
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
Martin Sustrik's avatar
Martin Sustrik committed
105 106 107 108 109
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
        io_threads [i]->stop ();

    //  Wait till I/O threads actually terminate.
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
110
        delete io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
111

112 113
    //  Close all application theads, sockets, io_objects etc.
    for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
114
        delete app_threads [i].app_thread;
115

Martin Sustrik's avatar
Martin Sustrik committed
116
    //  Deallocate all the orphaned pipes.
117 118
    while (!pipes.empty ())
        delete *pipes.begin ();
Martin Sustrik's avatar
Martin Sustrik committed
119

Martin Sustrik's avatar
Martin Sustrik committed
120 121
    delete [] command_pipes;

Martin Sustrik's avatar
Martin Sustrik committed
122
#ifdef ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
123 124 125 126 127 128
    //  On Windows, uninitialise socket layer.
    int rc = WSACleanup ();
    wsa_assert (rc != SOCKET_ERROR);
#endif
}

129
int zmq::dispatcher_t::thread_slot_count ()
Martin Sustrik's avatar
Martin Sustrik committed
130 131 132 133
{
    return signalers.size ();
}

134
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
Martin Sustrik's avatar
Martin Sustrik committed
135
{
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
    app_threads_sync.lock ();

    //  Find whether the calling thread has app_thread_t object associated
    //  already. At the same time find an unused app_thread_t so that it can
    //  be used if there's no associated object for the calling thread.
    //  Check whether thread ID is already assigned. If so, return it.
    app_threads_t::size_type unused = app_threads.size ();
    app_threads_t::size_type current;
    for (current = 0; current != app_threads.size (); current++) {
        if (app_threads [current].associated &&
              thread_t::equal (thread_t::id (), app_threads [current].tid))
            break;
        if (!app_threads [current].associated)
            unused = current;
    }

    //  If no app_thread_t is associated with the calling thread,
    //  associate it with one of the unused app_thread_t objects.
    if (current == app_threads.size ()) {
        if (unused == app_threads.size ()) {
            app_threads_sync.unlock ();
            errno = EMTHREAD;
            return NULL;
        }
        app_threads [unused].associated = true;
        app_threads [unused].tid = thread_t::id ();
        current = unused;
Martin Sustrik's avatar
Martin Sustrik committed
163
    }
164 165 166

    app_thread_t *thread = app_threads [current].app_thread;
    app_threads_sync.unlock ();
167

168 169 170 171
    socket_base_t *s = thread->create_socket (type_);
    if (!s)
        return NULL;

172 173 174 175
    term_sync.lock ();
    sockets++;
    term_sync.unlock ();

176
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
177 178
}

179 180 181 182 183 184 185 186 187 188 189 190 191 192
void zmq::dispatcher_t::destroy_socket ()
{
    //  If zmq_term was already called and there are no more sockets,
    //  terminate the whole 0MQ infrastructure.
    term_sync.lock ();
    zmq_assert (sockets > 0);
    sockets--;
    bool destroy = (sockets == 0 && terminated);
    term_sync.unlock ();

    if (destroy)
       delete this;
}

193 194 195 196 197 198 199 200 201 202 203 204 205
void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
{
    app_threads_sync.lock ();
    app_threads_t::size_type i;
    for (i = 0; i != app_threads.size (); i++)
        if (app_threads [i].app_thread == thread_) {
            app_threads [i].associated = false;
            break;
        }
    zmq_assert (i != app_threads.size ());
    app_threads_sync.unlock ();
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
void zmq::dispatcher_t::write (int source_, int destination_,
    const command_t &command_)
{
    command_pipe_t &pipe =
        command_pipes [source_ * signalers.size () + destination_];
    pipe.write (command_);
    if (!pipe.flush ())
        signalers [destination_]->signal (source_);
}

bool zmq::dispatcher_t::read (int source_,  int destination_,
    command_t *command_)
{
    return command_pipes [source_ * signalers.size () +
        destination_].read (command_);
}

223
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
224 225
{
    //  Find the I/O thread with minimum load.
226 227
    zmq_assert (io_threads.size () > 0);
    int min_load = -1;
Martin Sustrik's avatar
Martin Sustrik committed
228
    io_threads_t::size_type result = 0;
229 230
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
231
            int load = io_threads [i]->get_load ();
232
            if (min_load == -1 || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
233 234 235 236 237
                min_load = load;
                result = i;
            }
        }
    }
238
    zmq_assert (min_load != -1);
Martin Sustrik's avatar
Martin Sustrik committed
239 240
    return io_threads [result];
}
Martin Sustrik's avatar
Martin Sustrik committed
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256

void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
{
    pipes_sync.lock ();
    bool inserted = pipes.insert (pipe_).second;
    zmq_assert (inserted);
    pipes_sync.unlock ();
}

void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
{
    pipes_sync.lock ();
    pipes_t::size_type erased = pipes.erase (pipe_);
    zmq_assert (erased == 1);
    pipes_sync.unlock ();
}
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

int zmq::dispatcher_t::register_endpoint (const char *addr_,
    socket_base_t *socket_)
{
    endpoints_sync.lock ();

    bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
    if (!inserted) {
        errno = EADDRINUSE;
        endpoints_sync.unlock ();
        return -1;
    }

    endpoints_sync.unlock ();
    return 0;
}

void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
        if (it->second == socket_) {
            endpoints_t::iterator to_erase = it;
            it++;
            endpoints.erase (to_erase);
            continue;
        }
        it++;
    }
        
    endpoints_sync.unlock ();
}

zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
         return NULL;
     }
     socket_base_t *endpoint = it->second;

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
306 307
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
308 309 310 311 312 313
     endpoint->inc_seqnum ();

     endpoints_sync.unlock ();
     return endpoint;
}