ctx.cpp 17 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "platform.hpp"
Martin Hurton's avatar
Martin Hurton committed
31
#ifdef ZMQ_HAVE_WINDOWS
32 33 34 35 36
#include "windows.hpp"
#else
#include <unistd.h>
#endif

37
#include <limits>
38
#include <new>
39
#include <string.h>
40

41
#include "ctx.hpp"
42
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
43
#include "io_thread.hpp"
44
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45
#include "pipe.hpp"
46 47
#include "err.hpp"
#include "msg.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
48

49 50 51 52 53 54 55 56
#ifdef HAVE_LIBSODIUM
#ifdef HAVE_TWEETNACL
#include "randombytes.h"
#else
#include "sodium.h"
#endif
#endif

57 58 59
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD  0xdeadbeef

60 61
int clipped_maxsocket(int max_requested)
{
Richard Newton's avatar
Richard Newton committed
62
    if (max_requested >= zmq::poller_t::max_fds () && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
63 64
        // -1 because we need room for the reaper mailbox.
        max_requested = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
65

66 67 68
    return max_requested;
}

69
zmq::ctx_t::ctx_t () :
70
    tag (ZMQ_CTX_TAG_VALUE_GOOD),
71 72 73 74 75
    starting (true),
    terminating (false),
    reaper (NULL),
    slot_count (0),
    slots (NULL),
Richard Newton's avatar
Richard Newton committed
76
    max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
Pieter Hintjens's avatar
Pieter Hintjens committed
77
    io_thread_count (ZMQ_IO_THREADS_DFLT),
78
    blocky (true),
79 80 81
    ipv6 (false),
    thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
Martin Sustrik's avatar
Martin Sustrik committed
82
{
83 84 85
#ifdef HAVE_FORK
    pid = getpid();
#endif
86 87
}

88 89
bool zmq::ctx_t::check_tag ()
{
90
    return tag == ZMQ_CTX_TAG_VALUE_GOOD;
91 92
}

93
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
94
{
95
    //  Check that there are no remaining sockets.
96 97
    zmq_assert (sockets.empty ());

98 99
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
Martin Sustrik's avatar
Martin Sustrik committed
100 101 102 103 104
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
        io_threads [i]->stop ();

    //  Wait till I/O threads actually terminate.
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
105
        delete io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
106

107
    //  Deallocate the reaper thread object.
108
    delete reaper;
109

110 111
    //  Deallocate the array of mailboxes. No special work is
    //  needed as mailboxes themselves were deallocated with their
112
    //  corresponding io_thread/socket objects.
113
    free (slots);
114

115 116 117 118 119 120
    //  If we've done any Curve encryption, we may have a file handle
    //  to /dev/urandom open that needs to be cleaned up.
#ifdef HAVE_LIBSODIUM
    randombytes_close();
#endif

121
    //  Remove the tag, so that the object is considered dead.
122
    tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
123 124
}

125
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
126
{
127 128 129 130 131 132 133 134
    // Connect up any pending inproc connections, otherwise we will hang
    pending_connections_t copy = pending_connections;
    for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
        s->bind (p->first.c_str ());
        s->close ();
    }

135
    slot_sync.lock ();
136
    if (!starting) {
137

138
#ifdef HAVE_FORK
Pieter Hintjens's avatar
Pieter Hintjens committed
139
        if (pid != getpid ()) {
140 141 142
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
Martin Hurton's avatar
Martin Hurton committed
143
                sockets [i]->get_mailbox ()->forked ();
144

Martin Hurton's avatar
Martin Hurton committed
145
            term_mailbox.forked ();
146 147
        }
#endif
148

149 150 151
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
        bool restarted = terminating;
152
        terminating = true;
153

154 155 156 157 158 159 160 161 162 163
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
            for (sockets_t::size_type i = 0; i != sockets.size (); i++)
                sockets [i]->stop ();
            if (sockets.empty ())
                reaper->stop ();
        }
164
        slot_sync.unlock();
165 166 167 168 169 170

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
        int rc = term_mailbox.recv (&cmd, -1);
        if (rc == -1 && errno == EINTR)
            return -1;
171
        errno_assert (rc == 0);
172 173 174 175
        zmq_assert (cmd.type == command_t::done);
        slot_sync.lock ();
        zmq_assert (sockets.empty ());
    }
176
    slot_sync.unlock ();
177

178 179
    //  Deallocate the resources.
    delete this;
180

181 182
    return 0;
}
183

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
int zmq::ctx_t::shutdown ()
{
    slot_sync.lock ();
    if (!starting && !terminating) {
        terminating = true;

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
        for (sockets_t::size_type i = 0; i != sockets.size (); i++)
            sockets [i]->stop ();
        if (sockets.empty ())
            reaper->stop ();
    }
    slot_sync.unlock ();

    return 0;
}

203 204 205
int zmq::ctx_t::set (int option_, int optval_)
{
    int rc = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
206 207
    if (option_ == ZMQ_MAX_SOCKETS
    &&  optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
208 209 210 211 212
        opt_sync.lock ();
        max_sockets = optval_;
        opt_sync.unlock ();
    }
    else
213
    if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
214 215 216 217
        opt_sync.lock ();
        io_thread_count = optval_;
        opt_sync.unlock ();
    }
Pieter Hintjens's avatar
Pieter Hintjens committed
218 219 220
    else
    if (option_ == ZMQ_IPV6 && optval_ >= 0) {
        opt_sync.lock ();
221
        ipv6 = (optval_ != 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
222 223
        opt_sync.unlock ();
    }
224 225 226 227 228 229 230 231 232 233 234 235
    else
    if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
        opt_sync.lock();
        thread_priority = optval_;
        opt_sync.unlock();
    }
    else
    if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
        opt_sync.lock();
        thread_sched_policy = optval_;
        opt_sync.unlock();
    }
236 237 238 239 240 241
    else
    if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
        opt_sync.lock ();
        blocky = (optval_ != 0);
        opt_sync.unlock ();
    }
242 243 244 245 246 247 248 249 250 251 252 253 254
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

int zmq::ctx_t::get (int option_)
{
    int rc = 0;
    if (option_ == ZMQ_MAX_SOCKETS)
        rc = max_sockets;
    else
255
    if (option_ == ZMQ_SOCKET_LIMIT)
256
        rc = clipped_maxsocket (65535);
257
    else
258 259
    if (option_ == ZMQ_IO_THREADS)
        rc = io_thread_count;
Pieter Hintjens's avatar
Pieter Hintjens committed
260 261 262
    else
    if (option_ == ZMQ_IPV6)
        rc = ipv6;
263 264 265
    else
    if (option_ == ZMQ_BLOCKY)
        rc = blocky;
266 267 268 269 270 271 272
    else {
        errno = EINVAL;
        rc = -1;
    }
    return rc;
}

273 274
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
275
    slot_sync.lock ();
276 277 278 279
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
Pieter Hintjens's avatar
Pieter Hintjens committed
280
        //  zmq_ctx_term thread and reaper thread.
281 282 283 284 285
        opt_sync.lock ();
        int mazmq = max_sockets;
        int ios = io_thread_count;
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2;
somdoron's avatar
somdoron committed
286
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
287 288
        alloc_assert (slots);

Pieter Hintjens's avatar
Pieter Hintjens committed
289
        //  Initialise the infrastructure for zmq_ctx_term thread.
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
        slots [term_tid] = &term_mailbox;

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid);
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread);
            slots [i] = io_thread->get_mailbox ();
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i);
            slots [i] = NULL;
        }
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
315
    //  Once zmq_ctx_term() was called, we can't create new sockets.
316 317 318 319 320
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }
321

322 323 324 325 326
    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
327
    }
328

329 330 331
    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back ();
    empty_slots.pop_back ();
332

333 334 335
    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1;

336
    //  Create the socket and register its mailbox.
337
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
338 339 340
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
341
        return NULL;
342 343
    }
    sockets.push_back (s);
344
    slots [slot] = s->get_mailbox ();
345

346
    slot_sync.unlock ();
347
    return s;
Martin Sustrik's avatar
Martin Sustrik committed
348 349
}

350
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
351
{
352 353
    slot_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
354
    //  Free the associated thread slot.
355 356
    uint32_t tid = socket_->get_tid ();
    empty_slots.push_back (tid);
Martin Hurton's avatar
Martin Hurton committed
357
    slots [tid] = NULL;
358

359 360 361
    //  Remove the socket from the list of sockets.
    sockets.erase (socket_);

Pieter Hintjens's avatar
Pieter Hintjens committed
362
    //  If zmq_ctx_term() was already called and there are no more socket
363 364 365
    //  we can ask reaper thread to terminate.
    if (terminating && sockets.empty ())
        reaper->stop ();
366 367

    slot_sync.unlock ();
368 369
}

370 371 372 373 374
zmq::object_t *zmq::ctx_t::get_reaper ()
{
    return reaper;
}

375 376 377 378 379 380
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}

Martin Sustrik's avatar
Martin Sustrik committed
381
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
382
{
Martin Sustrik's avatar
Martin Sustrik committed
383
    slots [tid_]->send (command_);
384 385
}

386
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
387
{
388 389 390
    if (io_threads.empty ())
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
391
    //  Find the I/O thread with minimum load.
392
    int min_load = -1;
393
    io_thread_t *selected_io_thread = NULL;
394 395
    for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
Martin Sustrik's avatar
Martin Sustrik committed
396
            int load = io_threads [i]->get_load ();
397
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
398
                min_load = load;
399
                selected_io_thread = io_threads [i];
Martin Sustrik's avatar
Martin Sustrik committed
400 401 402
            }
        }
    }
403
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
404
}
Martin Sustrik's avatar
Martin Sustrik committed
405

406 407
int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
408 409 410
{
    endpoints_sync.lock ();

Martin Hurton's avatar
Martin Hurton committed
411 412
    const bool inserted = endpoints.insert (
        endpoints_t::value_type (std::string (addr_), endpoint_)).second;
413 414 415

    endpoints_sync.unlock ();

416 417 418 419 420 421 422
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

Martin Hurton's avatar
Martin Hurton committed
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
int zmq::ctx_t::unregister_endpoint (
        const std::string &addr_, socket_base_t *socket_)
{
    endpoints_sync.lock ();

    const endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end () || it->second.socket != socket_) {
        endpoints_sync.unlock ();
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
    endpoints.erase (it);

    endpoints_sync.unlock ();

    return 0;
}

443
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
444 445 446 447 448
{
    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.begin ();
    while (it != endpoints.end ()) {
449
        if (it->second.socket == socket_) {
450
            endpoints_t::iterator to_erase = it;
451
            ++it;
452 453 454
            endpoints.erase (to_erase);
            continue;
        }
455
        ++it;
456
    }
Martin Hurton's avatar
Martin Hurton committed
457

458 459 460
    endpoints_sync.unlock ();
}

461
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
462 463 464 465 466 467 468
{
     endpoints_sync.lock ();

     endpoints_t::iterator it = endpoints.find (addr_);
     if (it == endpoints.end ()) {
         endpoints_sync.unlock ();
         errno = ECONNREFUSED;
469 470
         endpoint_t empty = {NULL, options_t()};
         return empty;
471
     }
472
     endpoint_t endpoint = it->second;
473 474 475

     //  Increment the command sequence number of the peer so that it won't
     //  get deallocated until "bind" command is issued by the caller.
476 477
     //  The subsequent 'bind' has to be called with inc_seqnum parameter
     //  set to false, so that the seqnum isn't incremented twice.
478
     endpoint.socket->inc_seqnum ();
479 480

     endpoints_sync.unlock ();
481
     return endpoint;
482
}
483

Martin Hurton's avatar
Martin Hurton committed
484 485
void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
486
{
Martin Hurton's avatar
Martin Hurton committed
487 488 489
    const pending_connection_t pending_connection =
        {endpoint_, pipes_ [0], pipes_ [1]};

490 491
    endpoints_sync.lock ();

492
    endpoints_t::iterator it = endpoints.find (addr_);
Pieter Hintjens's avatar
Pieter Hintjens committed
493
    if (it == endpoints.end ()) {
494
        // Still no bind.
Martin Hurton's avatar
Martin Hurton committed
495 496
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
497 498 499
    }
    else
        // Bind has happened in the mean time, connect directly
Martin Hurton's avatar
Martin Hurton committed
500
        connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
501 502 503 504

    endpoints_sync.unlock ();
}

505
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
506 507 508
{
    endpoints_sync.lock ();

509
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
510

511
    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
512 513 514 515 516 517
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
    endpoints_sync.unlock ();
}

Pieter Hintjens's avatar
Pieter Hintjens committed
518
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
Martin Hurton's avatar
Martin Hurton committed
519
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
520 521
{
    bind_socket_->inc_seqnum();
Martin Hurton's avatar
Martin Hurton committed
522
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
523

524 525 526 527 528 529 530 531
    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

532 533 534 535 536 537 538
    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

539 540 541 542 543 544 545 546 547 548 549
    if (!conflate) {
        pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
        pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
        pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
    }
    else {
        pending_connection_.connect_pipe->set_hwms(-1, -1);
        pending_connection_.bind_pipe->set_hwms(-1, -1);
    }
550 551 552 553 554 555 556 557 558 559

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
560 561 562 563 564 565 566 567 568 569

    if (pending_connection_.endpoint.options.recv_identity) {
        msg_t id;
        int rc = id.init_size (bind_options.identity_size);
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
        bool written = pending_connection_.bind_pipe->write (&id);
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
570 571 572
    }
}

573 574 575 576
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;