ctx.cpp 9.48 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2012 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3
    Copyright (c) 2009-2011 250bpm s.r.o.
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
23
#ifdef ZMQ_HAVE_WINDOWS
24 25 26 27 28
#include "windows.hpp"
#else
#include <unistd.h>
#endif

29
#include <new>
30
#include <string.h>
31

32
#include "ctx.hpp"
33
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
34
#include "io_thread.hpp"
35
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36
#include "pipe.hpp"
37 38
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39

40 41 42 43 44 45 46 47 48
zmq::ctx_t::ctx_t () :
    tag (0xabadcafe),
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
    max_sockets (ZMQ_MAX_SOCKETS_DFLT),
    io_thread_count (ZMQ_IO_THREADS_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
49
{
50 51
}

52 53
bool zmq::ctx_t::check_tag ()
{
54
    return tag == 0xabadcafe;
55 56
}

57
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
58
{
59
    //  Check that there are no remaining sockets.
60 61
    zmq_assert (sockets.empty ());

62 63
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
Martin Sustrik's avatar
Martin Sustrik committed
64 65 66 67 68
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
        io_threads [i]->stop ();

    //  Wait till I/O threads actually terminate.
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
69
        delete io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
70

71
    //  Deallocate the reaper thread object.
72 73
    if (reaper)
        delete reaper;
74

75 76
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
77
    //  corresponding io_thread/socket objects.
78 79
    if (slots)
        free (slots);
80 81 82

    //  Remove the tag, so that the object is considered dead.
    tag = 0xdeadbeef;
Martin Sustrik's avatar
Martin Sustrik committed
83 84
}

85
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
86
{
87
    slot_sync.lock ();
88
    if (!starting) {
89

90 91 92
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
93 94
        terminating = true;
        slot_sync.unlock ();
95

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
        //  First attempt to terminate the context.
        if (!restarted) {

            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            slot_sync.lock ();
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
            slot_sync.unlock ();
        }

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
        zmq_assert (rc == 0);
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
120
    slot_sync.unlock ();
121

122 123
    //  Deallocate the resources.
    delete this;
124

125 126
    return 0;
}
127

128 129 130
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
131
    if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1) {
132 133 134 135 136
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
137
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

164 165
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
166
    slot_sync.lock ();
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
        //  zmq_term thread and reaper thread.
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
        slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
        alloc_assert (slots);

        //  Initialise the infrastructure for zmq_term thread.
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

206 207 208 209 210 211
    //  Once zmq_term() was called, we can't create new sockets.
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
212

213 214 215 216 217
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
218
    }
219

220 221 222
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
223

224 225 226
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

227
    //  Create the socket and register its mailbox.
228
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
229 230 231
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
232
        return NULL;
233 234
    }
    sockets.push_back (s);
235
    slots [slot] = s->get_mailbox ();
236

237
    slot_sync.unlock ();
238
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
239 240
}

241
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
242
{
243 244
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
245
    //  Free the associated thread slot.
246 247
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
248
    slots [tid] = NULL;
249

250 251 252 253 254 255 256
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

    //  If zmq_term() was already called and there are no more socket
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
257 258

    slot_sync.unlock ();
259 260
}

261 262 263 264 265
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

Martin Sustrik's avatar
Martin Sustrik committed
266
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
267
{
Martin Sustrik's avatar
Martin Sustrik committed
268
    slots [tid_]->send (command_);
269 270
}

271
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
272
{
273 274 275
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
276
    //  Find the I/O thread with minimum load.
277
    int min_load = -1;
278
    io_thread_t *selected_io_thread = NULL;
279 280
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
281
            int load = io_threads [i]->get_load ();
282
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
283
                min_load = load;
284
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
285 286 287
            }
        }
    }
288
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
289
}
Martin Sustrik's avatar
Martin Sustrik committed
290

291
int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
292 293 294
{
    endpoints_sync.lock ();

295
    bool inserted = endpoints.insert (endpoints_t::value_type (
296
        std::string (addr_), endpoint_)).second;
297 298 299

    endpoints_sync.unlock ();

300 301 302 303 304 305 306 307
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }

    return 0;
}

308
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
309 310 311 312 313
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
314
        if (it->second.socket == socket_) {
315
            endpoints_t::iterator to_erase = it;
316
            ++it;
317 318 319
            endpoints.erase (to_erase);
            continue;
        }
320
        ++it;
321
    }
Martin Hurton's avatar
Martin Hurton committed
322

323 324 325
    endpoints_sync.unlock ();
}

326
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
327 328 329 330 331 332 333
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
334 335
         endpoint_t empty = {NULL, options_t()};
         return empty;
336
     }
337
     endpoint_t endpoint = it->second;
338 339 340

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
341 342
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
343
     endpoint.socket->inc_seqnum ();
344 345

     endpoints_sync.unlock ();
346
     return endpoint;
347
}
348 349 350 351 352

//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;