ctx.cpp 17.1 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
31
#ifdef ZMQ_HAVE_WINDOWS
32 33 34 35 36
#include "windows.hpp"
#else
#include <unistd.h>
#endif

37
#include <limits>
38
#include <new>
39
#include <string.h>
40

41
#include "ctx.hpp"
42
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
43
#include "io_thread.hpp"
44
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45
#include "pipe.hpp"
46 47
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
48

49 50 51 52 53 54 55 56
#ifdef HAVE_LIBSODIUM
#ifdef HAVE_TWEETNACL
#include "randombytes.h"
#else
#include "sodium.h"
#endif
#endif

57 58 59
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

60 61
int clipped_maxsocket(int max_requested)
{
Richard Newton's avatar
Richard Newton committed
62
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
63 64
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
65

66 67 68
    return max_requested;
}

69
zmq::ctx_t::ctx_t () :
70
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
71 72 73 74 75
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
76
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
Pieter Hintjens's avatar
Pieter Hintjens committed
77
    io_thread_count (ZMQ_IO_THREADS_DFLT),
78
    blocky (true),
79 80 81
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
82
{
83 84 85
#ifdef HAVE_FORK
    pid = getpid();
#endif
86 87
}

88 89
bool zmq::ctx_t::check_tag ()
{
90
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
91 92
}

93
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
94
{
95
    //  Check that there are no remaining sockets.
96 97
    zmq_assert (sockets.empty ());

98 99
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
100
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
Martin Sustrik's avatar
Martin Sustrik committed
101
        io_threads [i]->stop ();
102
    }
Martin Sustrik's avatar
Martin Sustrik committed
103 104

    //  Wait till I/O threads actually terminate.
105 106 107
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        LIBZMQ_DELETE(io_threads [i]);
    }
Martin Sustrik's avatar
Martin Sustrik committed
108

109
    //  Deallocate the reaper thread object.
110
    LIBZMQ_DELETE(reaper);
111

112 113
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
114
    //  corresponding io_thread/socket objects.
115
    free (slots);
116

117 118 119 120 121 122
    //  If we've done any Curve encryption, we may have a file handle
    //  to /dev/urandom open that needs to be cleaned up.
#ifdef HAVE_LIBSODIUM
    randombytes_close();
#endif

123
    //  Remove the tag, so that the object is considered dead.
124
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
125 126
}

127
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
128
{
129 130 131 132 133 134 135 136
    // Connect up any pending inproc connections, otherwise we will hang
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
        s->bind (p->first.c_str ());
        s->close ();
    }

137
    slot_sync.lock ();
138
    if (!starting) {
139

140
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
141
        if (pid != getpid ()) {
142 143 144
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
145
                sockets [i]->get_mailbox ()->forked ();
146

Martin Hurton's avatar
Martin Hurton committed
147
            term_mailbox.forked ();
148 149
        }
#endif
150

151 152 153
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
154
        terminating = true;
155

156 157 158 159 160 161 162 163 164 165
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
166
        slot_sync.unlock();
167 168 169 170 171 172

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
173
        errno_assert (rc == 0);
174 175 176 177
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
178
    slot_sync.unlock ();
179

180 181
    //  Deallocate the resources.
    delete this;
182

183 184
    return 0;
}
185

186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
int zmq::ctx_t::shutdown ()
{
    slot_sync.lock ();
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }
    slot_sync.unlock ();

    return 0;
}

205 206 207
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
208 209
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
210 211 212 213 214
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
215
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
216 217 218 219
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
220 221 222
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
        opt_sync.lock ();
223
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
224 225
        opt_sync.unlock ();
    }
226 227 228 229 230 231 232 233 234 235 236 237
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
        opt_sync.lock();
        thread_priority = optval_;
        opt_sync.unlock();
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
        opt_sync.lock();
        thread_sched_policy = optval_;
        opt_sync.unlock();
    }
238 239 240 241 242 243
    else
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
        opt_sync.lock ();
        blocky = (optval_ != 0);
        opt_sync.unlock ();
    }
244 245 246 247 248 249 250 251 252 253 254 255 256
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
257
    if (option_ == ZMQ_SOCKET_LIMIT)
258
        rc = clipped_maxsocket (65535);
259
    else
260 261
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
262 263 264
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
265 266 267
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
268 269 270 271 272 273 274
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

275 276
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
277
    slot_sync.lock ();
278 279 280 281
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
282
        //  zmq_ctx_term thread and reaper thread.
283 284 285 286 287
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
288
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
289 290
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
291
        //  Initialise the infrastructure for zmq_ctx_term thread.
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
317
    //  Once zmq_ctx_term() was called, we can't create new sockets.
318 319 320 321 322
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
323

324 325 326 327 328
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
329
    }
330

331 332 333
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
334

335 336 337
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

338
    //  Create the socket and register its mailbox.
339
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
340 341 342
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
343
        return NULL;
344 345
    }
    sockets.push_back (s);
346
    slots [slot] = s->get_mailbox ();
347

348
    slot_sync.unlock ();
349
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
350 351
}

352
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
353
{
354 355
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
356
    //  Free the associated thread slot.
357 358
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
359
    slots [tid] = NULL;
360

361 362 363
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
364
    //  If zmq_ctx_term() was already called and there are no more socket
365 366 367
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
368 369

    slot_sync.unlock ();
370 371
}

372 373 374 375 376
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

377 378 379 380 381 382
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}

Martin Sustrik's avatar
Martin Sustrik committed
383
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
384
{
Martin Sustrik's avatar
Martin Sustrik committed
385
    slots [tid_]->send (command_);
386 387
}

388
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
389
{
390 391 392
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
393
    //  Find the I/O thread with minimum load.
394
    int min_load = -1;
395
    io_thread_t *selected_io_thread = NULL;
396 397
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
398
            int load = io_threads [i]->get_load ();
399
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
400
                min_load = load;
401
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
402 403 404
            }
        }
    }
405
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
406
}
Martin Sustrik's avatar
Martin Sustrik committed
407

408 409
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
410 411 412
{
    endpoints_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
413 414
    const bool inserted = endpoints.insert (
        endpoints_t::value_type (std::string (addr_), endpoint_)).second;
415 416 417

    endpoints_sync.unlock ();

418 419 420 421 422 423 424
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    endpoints_sync.lock ();

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        endpoints_sync.unlock ();
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    endpoints_sync.unlock ();

    return 0;
}

445
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
446 447 448 449 450
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
451
        if (it->second.socket == socket_) {
452
            endpoints_t::iterator to_erase = it;
453
            ++it;
454 455 456
            endpoints.erase (to_erase);
            continue;
        }
457
        ++it;
458
    }
Martin Hurton's avatar
Martin Hurton committed
459

460 461 462
    endpoints_sync.unlock ();
}

463
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
464 465 466 467 468 469 470
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
471 472
         endpoint_t empty = {NULL, options_t()};
         return empty;
473
     }
474
     endpoint_t endpoint = it->second;
475 476 477

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
478 479
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
480
     endpoint.socket->inc_seqnum ();
481 482

     endpoints_sync.unlock ();
483
     return endpoint;
484
}
485

Martin Hurton's avatar
Martin Hurton committed
486 487
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
488
{
Martin Hurton's avatar
Martin Hurton committed
489 490 491
    const pending_connection_t pending_connection =
        {endpoint_, pipes_ [0], pipes_ [1]};

492 493
    endpoints_sync.lock ();

494
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
495
    if (it == endpoints.end ()) {
496
        // Still no bind.
Martin Hurton's avatar
Martin Hurton committed
497 498
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
499 500 501
    }
    else
        // Bind has happened in the mean time, connect directly
Martin Hurton's avatar
Martin Hurton committed
502
        connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
503 504 505 506

    endpoints_sync.unlock ();
}

507
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
508 509 510
{
    endpoints_sync.lock ();

511
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
512

513
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
514 515 516 517 518 519
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
    endpoints_sync.unlock ();
}

Pieter Hintjens's avatar
Pieter Hintjens committed
520
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
521
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
522 523
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
524
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
525

526 527 528 529 530 531 532 533
    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

534 535 536 537 538 539 540
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

541 542 543 544 545 546 547 548 549 550 551
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
552 553 554 555 556 557 558 559 560 561

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
562 563 564 565 566 567 568 569 570 571

    if (pending_connection_.endpoint.options.recv_identity) {
        msg_t id;
        int rc = id.init_size (bind_options.identity_size);
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
        bool written = pending_connection_.bind_pipe->write (&id);
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
572 573 574
    }
}

575 576 577 578
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;