ctx.cpp 11.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
23
#ifdef ZMQ_HAVE_WINDOWS
24 25 26 27 28
#include "windows.hpp"
#else
#include <unistd.h>
#endif

29
#include <new>
30
#include <string.h>
31

32
#include "ctx.hpp"
33
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
34
#include "io_thread.hpp"
35
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "pipe.hpp"
37 38
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39

40 41 42 43 44 45 46 47
zmq::ctx_t::ctx_t () :
    tag (0xabadcafe),
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
    max_sockets (ZMQ_MAX_SOCKETS_DFLT),
48 49
    io_thread_count (ZMQ_IO_THREADS_DFLT),
    monitor_fn (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
50
{
51 52
}

53 54
bool zmq::ctx_t::check_tag ()
{
55
    return tag == 0xabadcafe;
56 57
}

58
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
59
{
60
    //  Check that there are no remaining sockets.
61 62
    zmq_assert (sockets.empty ());

63 64
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
Martin Sustrik's avatar
Martin Sustrik committed
65 66 67 68 69
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
        io_threads [i]->stop ();

    //  Wait till I/O threads actually terminate.
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
70
        delete io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
71

72
    //  Deallocate the reaper thread object.
73 74
    if (reaper)
        delete reaper;
75

76 77
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
78
    //  corresponding io_thread/socket objects.
79 80
    if (slots)
        free (slots);
81 82 83

    //  Remove the tag, so that the object is considered dead.
    tag = 0xdeadbeef;
Martin Sustrik's avatar
Martin Sustrik committed
84 85
}

86
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
87
{
88
    slot_sync.lock ();
89
    if (!starting) {
90

91 92 93
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
94 95
        terminating = true;
        slot_sync.unlock ();
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
        //  First attempt to terminate the context.
        if (!restarted) {

            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            slot_sync.lock ();
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
            slot_sync.unlock ();
        }

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
116
        errno_assert (rc == 0);
117 118 119 120
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
121
    slot_sync.unlock ();
122

123 124
    //  Deallocate the resources.
    delete this;
125

126 127
    return 0;
}
128

129 130 131 132 133 134
int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_)
{
    monitor_fn = monitor_;
    return 0;
}

135 136 137
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
138
    if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1) {
139 140 141 142 143
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
144
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

171 172
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
173
    slot_sync.lock ();
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
        //  zmq_term thread and reaper thread.
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
        slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
        alloc_assert (slots);

        //  Initialise the infrastructure for zmq_term thread.
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

213 214 215 216 217 218
    //  Once zmq_term() was called, we can't create new sockets.
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
219

220 221 222 223 224
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
225
    }
226

227 228 229
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
230

231 232 233
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

234
    //  Create the socket and register its mailbox.
235
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
236 237 238
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
239
        return NULL;
240 241
    }
    sockets.push_back (s);
242
    slots [slot] = s->get_mailbox ();
243

244
    slot_sync.unlock ();
245
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
246 247
}

248
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
249
{
250 251
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
252
    //  Free the associated thread slot.
253 254
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
255
    slots [tid] = NULL;
256

257 258 259 260 261 262 263
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

    //  If zmq_term() was already called and there are no more socket
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
264 265

    slot_sync.unlock ();
266 267
}

268 269 270 271 272
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

Martin Sustrik's avatar
Martin Sustrik committed
273
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
274
{
Martin Sustrik's avatar
Martin Sustrik committed
275
    slots [tid_]->send (command_);
276 277
}

278
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
279
{
280 281 282
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
283
    //  Find the I/O thread with minimum load.
284
    int min_load = -1;
285
    io_thread_t *selected_io_thread = NULL;
286 287
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
288
            int load = io_threads [i]->get_load ();
289
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
290
                min_load = load;
291
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
292 293 294
            }
        }
    }
295
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
296
}
Martin Sustrik's avatar
Martin Sustrik committed
297

298
int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
299 300 301
{
    endpoints_sync.lock ();

302
    bool inserted = endpoints.insert (endpoints_t::value_type (
303
        std::string (addr_), endpoint_)).second;
304 305 306

    endpoints_sync.unlock ();

307 308 309 310 311 312 313 314
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }

    return 0;
}

315
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
316 317 318 319 320
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
321
        if (it->second.socket == socket_) {
322
            endpoints_t::iterator to_erase = it;
323
            ++it;
324 325 326
            endpoints.erase (to_erase);
            continue;
        }
327
        ++it;
328
    }
Martin Hurton's avatar
Martin Hurton committed
329

330 331 332
    endpoints_sync.unlock ();
}

333
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
334 335 336 337 338 339 340
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
341 342
         endpoint_t empty = {NULL, options_t()};
         return empty;
343
     }
344
     endpoint_t endpoint = it->second;
345 346 347

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
348 349
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
350
     endpoint.socket->inc_seqnum ();
351 352

     endpoints_sync.unlock ();
353
     return endpoint;
354
}
355

356
void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_)
357 358 359 360 361 362
{
    if (monitor_fn != NULL) {
        zmq_event_data_t data;
        memset(&data, 0, sizeof (zmq_event_data_t));
        switch (event_) {
        case ZMQ_EVENT_CONNECTED:
363 364
            data.connected.addr = va_arg (args_, char*);
            data.connected.fd = va_arg (args_, int);
365 366
            break;
        case ZMQ_EVENT_CONNECT_DELAYED:
367 368
            data.connect_delayed.addr = va_arg (args_, char*);
            data.connect_delayed.err = va_arg (args_, int);
369 370
            break;
        case ZMQ_EVENT_CONNECT_RETRIED:
371 372
            data.connect_retried.addr = va_arg (args_, char*);
            data.connect_retried.interval = va_arg (args_, int);
373 374
            break;
        case ZMQ_EVENT_LISTENING:
375 376
            data.listening.addr = va_arg (args_, char*);
            data.listening.fd = va_arg (args_, int);
377 378
            break;
        case ZMQ_EVENT_BIND_FAILED:
379 380
            data.bind_failed.addr = va_arg (args_, char*);
            data.bind_failed.err = va_arg (args_, int);
381 382
            break;
        case ZMQ_EVENT_ACCEPTED:
383 384
            data.accepted.addr = va_arg (args_, char*);
            data.accepted.fd = va_arg (args_, int);
385 386
            break;
        case ZMQ_EVENT_ACCEPT_FAILED:
387 388
            data.accept_failed.addr = va_arg (args_, char*);
            data.accept_failed.err = va_arg (args_, int);
389 390
            break;
        case ZMQ_EVENT_CLOSED:
391 392
            data.closed.addr = va_arg (args_, char*);
            data.closed.fd = va_arg (args_, int);
393 394
            break;
        case ZMQ_EVENT_CLOSE_FAILED:
395 396
            data.close_failed.addr = va_arg (args_, char*);
            data.close_failed.err = va_arg (args_, int);
397 398
            break;
        case ZMQ_EVENT_DISCONNECTED:
399 400
            data.disconnected.addr = va_arg (args_, char*);
            data.disconnected.fd = va_arg (args_, int);
401 402 403 404 405 406 407 408
            break;
        default:
            zmq_assert (false);
        }
        monitor_fn ((void *)socket_, event_, &data);
    }
}

409 410 411 412
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;