ctx.cpp 25.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#ifndef ZMQ_HAVE_WINDOWS
33 34 35
#include <unistd.h>
#endif

36
#include <limits>
37
#include <climits>
38
#include <new>
f18m's avatar
f18m committed
39
#include <sstream>
40
#include <string.h>
41

42
#include "ctx.hpp"
43
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
44
#include "io_thread.hpp"
45
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "pipe.hpp"
47 48
#include "err.hpp"
#include "msg.hpp"
49
#include "random.hpp"
50

Ilya Kulakov's avatar
Ilya Kulakov committed
51 52 53 54
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

55 56 57 58
#ifdef ZMQ_USE_NSS
#include <nss.h>
#endif

59 60 61 62
#ifdef ZMQ_USE_GNUTLS
#include <gnutls/gnutls.h>
#endif

63
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
64
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
65

66
static int clipped_maxsocket (int max_requested_)
67
{
68
    if (max_requested_ >= zmq::poller_t::max_fds ()
69
        && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
70
        // -1 because we need room for the reaper mailbox.
71
        max_requested_ = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
72

73
    return max_requested_;
74 75
}

76
zmq::ctx_t::ctx_t () :
77 78 79 80 81 82 83 84 85 86
    _tag (ZMQ_CTX_TAG_VALUE_GOOD),
    _starting (true),
    _terminating (false),
    _reaper (NULL),
    _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
    _max_msgsz (INT_MAX),
    _io_thread_count (ZMQ_IO_THREADS_DFLT),
    _blocky (true),
    _ipv6 (false),
    _zero_copy (true)
Martin Sustrik's avatar
Martin Sustrik committed
87
{
88
#ifdef HAVE_FORK
89
    _pid = getpid ();
90
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
91
#ifdef ZMQ_HAVE_VMCI
92 93
    _vmci_fd = -1;
    _vmci_family = -1;
Ilya Kulakov's avatar
Ilya Kulakov committed
94
#endif
95

96 97
    //  Initialise crypto library, if needed.
    zmq::random_open ();
98 99 100 101

#ifdef ZMQ_USE_NSS
    NSS_NoDB_Init (NULL);
#endif
102 103 104 105

#ifdef ZMQ_USE_GNUTLS
    gnutls_global_init ();
#endif
106 107
}

108
bool zmq::ctx_t::check_tag () const
109
{
110
    return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
111 112
}

113
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
114
{
115 116
    //  Check that there are no remaining _sockets.
    zmq_assert (_sockets.empty ());
117

118 119
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
120 121
    const io_threads_t::size_type io_threads_size = _io_threads.size ();
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
122
        _io_threads[i]->stop ();
123
    }
Martin Sustrik's avatar
Martin Sustrik committed
124 125

    //  Wait till I/O threads actually terminate.
126
    for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
127
        LIBZMQ_DELETE (_io_threads[i]);
128
    }
Martin Sustrik's avatar
Martin Sustrik committed
129

130
    //  Deallocate the reaper thread object.
131
    LIBZMQ_DELETE (_reaper);
132

133
    //  The mailboxes in _slots themselves were deallocated with their
134
    //  corresponding io_thread/socket objects.
135

136 137
    //  De-initialise crypto library, if needed.
    zmq::random_close ();
138

139 140 141 142
#ifdef ZMQ_USE_NSS
    NSS_Shutdown ();
#endif

143 144 145 146
#ifdef ZMQ_USE_GNUTLS
    gnutls_global_deinit ();
#endif

147
    //  Remove the tag, so that the object is considered dead.
148
    _tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
149 150
}

151 152
bool zmq::ctx_t::valid () const
{
153
    return _term_mailbox.valid ();
154 155
}

156
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
157
{
158
    _slot_sync.lock ();
159

160
    const bool save_terminating = _terminating;
161
    _terminating = false;
162

163
    // Connect up any pending inproc connections, otherwise we will hang
164
    pending_connections_t copy = _pending_connections;
165 166
    for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
         p != end; ++p) {
167
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
168 169
        // create_socket might fail eg: out of memory/sockets limit reached
        zmq_assert (s);
170 171 172
        s->bind (p->first.c_str ());
        s->close ();
    }
173
    _terminating = save_terminating;
174

175
    if (!_starting) {
176
#ifdef HAVE_FORK
177
        if (_pid != getpid ()) {
178 179
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
180 181
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
                 i++) {
182
                _sockets[i]->get_mailbox ()->forked ();
183
            }
184
            _term_mailbox.forked ();
185 186
        }
#endif
187

188 189
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
190
        const bool restarted = _terminating;
191
        _terminating = true;
192

193 194 195 196 197
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
198 199
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
                 i++) {
200
                _sockets[i]->stop ();
201
            }
202 203
            if (_sockets.empty ())
                _reaper->stop ();
204
        }
205
        _slot_sync.unlock ();
206 207 208

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
209
        const int rc = _term_mailbox.recv (&cmd, -1);
210 211
        if (rc == -1 && errno == EINTR)
            return -1;
212
        errno_assert (rc == 0);
213
        zmq_assert (cmd.type == command_t::done);
214 215
        _slot_sync.lock ();
        zmq_assert (_sockets.empty ());
216
    }
217
    _slot_sync.unlock ();
218

Ilya Kulakov's avatar
Ilya Kulakov committed
219
#ifdef ZMQ_HAVE_VMCI
220
    _vmci_sync.lock ();
Ilya Kulakov's avatar
Ilya Kulakov committed
221

222 223 224
    VMCISock_ReleaseAFValueFd (_vmci_fd);
    _vmci_family = -1;
    _vmci_fd = -1;
Ilya Kulakov's avatar
Ilya Kulakov committed
225

226
    _vmci_sync.unlock ();
Ilya Kulakov's avatar
Ilya Kulakov committed
227 228
#endif

229 230
    //  Deallocate the resources.
    delete this;
231

232 233
    return 0;
}
234

235 236
int zmq::ctx_t::shutdown ()
{
237
    scoped_lock_t locker (_slot_sync);
238

239
    if (!_terminating) {
240
        _terminating = true;
241

242 243 244 245
        if (!_starting) {
            //  Send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
246 247
            for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
                 i++) {
248
                _sockets[i]->stop ();
249
            }
250 251 252
            if (_sockets.empty ())
                _reaper->stop ();
        }
253 254 255 256 257
    }

    return 0;
}

258
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
259
{
260
    const bool is_int = (optvallen_ == sizeof (int));
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    int value = 0;
    if (is_int)
        memcpy (&value, optval_, sizeof (int));

    switch (option_) {
        case ZMQ_MAX_SOCKETS:
            if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
                scoped_lock_t locker (_opt_sync);
                _max_sockets = value;
                return 0;
            }
            break;

        case ZMQ_IO_THREADS:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _io_thread_count = value;
                return 0;
            }
            break;

        case ZMQ_IPV6:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _ipv6 = (value != 0);
                return 0;
            }
            break;

        case ZMQ_BLOCKY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _blocky = (value != 0);
                return 0;
            }
            break;

        case ZMQ_MAX_MSGSZ:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _max_msgsz = value < INT_MAX ? value : INT_MAX;
                return 0;
            }
            break;

        case ZMQ_ZERO_COPY_RECV:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _zero_copy = (value != 0);
                return 0;
            }
            break;

        default: {
            return thread_ctx_t::set (option_, optval_, optvallen_);
        }
317
    }
318 319 320

    errno = EINVAL;
    return -1;
321 322
}

323
int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_)
324
{
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
    const bool is_int = (*optvallen_ == sizeof (int));
    int *value = static_cast<int *> (optval_);

    switch (option_) {
        case ZMQ_MAX_SOCKETS:
            if (is_int) {
                *value = _max_sockets;
                return 0;
            }
            break;

        case ZMQ_SOCKET_LIMIT:
            if (is_int) {
                *value = clipped_maxsocket (65535);
                return 0;
            }
            break;

        case ZMQ_IO_THREADS:
            if (is_int) {
                *value = _io_thread_count;
                return 0;
            }
            break;

        case ZMQ_IPV6:
            if (is_int) {
                *value = _ipv6;
                return 0;
            }
            break;

        case ZMQ_BLOCKY:
            if (is_int) {
                *value = _blocky;
                return 0;
            }
            break;

        case ZMQ_MAX_MSGSZ:
            if (is_int) {
                *value = _max_msgsz;
                return 0;
            }
            break;

        case ZMQ_MSG_T_SIZE:
            if (is_int) {
                *value = sizeof (zmq_msg_t);
                return 0;
            }
            break;

        case ZMQ_ZERO_COPY_RECV:
            if (is_int) {
                *value = _zero_copy;
                return 0;
            }
            break;

        default: {
            return thread_ctx_t::get (option_, optval_, optvallen_);
        }
388
    }
389 390 391 392 393 394 395

    errno = EINVAL;
    return -1;
}

int zmq::ctx_t::get (int option_)
{
396 397
    int optval = 0;
    size_t optvallen = sizeof (int);
398

399 400
    if (get (option_, &optval, &optvallen) == 0)
        return optval;
401 402 403

    errno = EINVAL;
    return -1;
404 405
}

406
bool zmq::ctx_t::start ()
407
{
408
    //  Initialise the array of mailboxes. Additional two slots are for
409
    //  zmq_ctx_term thread and reaper thread.
410
    _opt_sync.lock ();
411 412 413
    const int term_and_reaper_threads_count = 2;
    const int mazmq = _max_sockets;
    const int ios = _io_thread_count;
414
    _opt_sync.unlock ();
415
    const int slot_count = mazmq + ios + term_and_reaper_threads_count;
416 417 418 419 420
    try {
        _slots.reserve (slot_count);
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
    }
    catch (const std::bad_alloc &) {
421
        errno = ENOMEM;
422
        return false;
423
    }
424
    _slots.resize (term_and_reaper_threads_count);
425

426
    //  Initialise the infrastructure for zmq_ctx_term thread.
427
    _slots[term_tid] = &_term_mailbox;
428

429
    //  Create the reaper thread.
430 431
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
    if (!_reaper) {
432 433 434
        errno = ENOMEM;
        goto fail_cleanup_slots;
    }
435
    if (!_reaper->get_mailbox ()->valid ())
436
        goto fail_cleanup_reaper;
437 438
    _slots[reaper_tid] = _reaper->get_mailbox ();
    _reaper->start ();
439 440

    //  Create I/O thread objects and launch them.
441
    _slots.resize (slot_count, NULL);
442

443 444
    for (int i = term_and_reaper_threads_count;
         i != ios + term_and_reaper_threads_count; i++) {
445 446 447 448 449 450 451 452
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
        if (!io_thread) {
            errno = ENOMEM;
            goto fail_cleanup_reaper;
        }
        if (!io_thread->get_mailbox ()->valid ()) {
            delete io_thread;
            goto fail_cleanup_reaper;
453
        }
454 455
        _io_threads.push_back (io_thread);
        _slots[i] = io_thread->get_mailbox ();
456 457 458 459
        io_thread->start ();
    }

    //  In the unused part of the slot array, create a list of empty slots.
460 461
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
462
        _empty_slots.push_back (i);
463 464
    }

465
    _starting = false;
466 467 468
    return true;

fail_cleanup_reaper:
469 470 471
    _reaper->stop ();
    delete _reaper;
    _reaper = NULL;
472 473

fail_cleanup_slots:
474
    _slots.clear ();
475 476 477 478 479
    return false;
}

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
480
    scoped_lock_t locker (_slot_sync);
481

482 483
    //  Once zmq_ctx_term() or zmq_ctx_shutdown() was called, we can't create
    //  new sockets.
484
    if (_terminating) {
485 486 487
        errno = ETERM;
        return NULL;
    }
488

489 490 491 492 493
    if (unlikely (_starting)) {
        if (!start ())
            return NULL;
    }

494
    //  If max_sockets limit was reached, return error.
495
    if (_empty_slots.empty ()) {
496 497
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
498
    }
499

500
    //  Choose a slot for the socket.
501
    const uint32_t slot = _empty_slots.back ();
502
    _empty_slots.pop_back ();
503

504
    //  Generate new unique socket ID.
505
    const int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
506

507
    //  Create the socket and register its mailbox.
508
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
509
    if (!s) {
510
        _empty_slots.push_back (slot);
511
        return NULL;
512
    }
513 514
    _sockets.push_back (s);
    _slots[slot] = s->get_mailbox ();
515 516

    return s;
Martin Sustrik's avatar
Martin Sustrik committed
517 518
}

519
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
520
{
521
    scoped_lock_t locker (_slot_sync);
522

Martin Hurton's avatar
Martin Hurton committed
523
    //  Free the associated thread slot.
524
    const uint32_t tid = socket_->get_tid ();
525 526
    _empty_slots.push_back (tid);
    _slots[tid] = NULL;
527

528
    //  Remove the socket from the list of sockets.
529
    _sockets.erase (socket_);
530

Pieter Hintjens's avatar
Pieter Hintjens committed
531
    //  If zmq_ctx_term() was already called and there are no more socket
532
    //  we can ask reaper thread to terminate.
533 534
    if (_terminating && _sockets.empty ())
        _reaper->stop ();
535 536
}

537
zmq::object_t *zmq::ctx_t::get_reaper () const
538
{
539
    return _reaper;
540 541
}

542
zmq::thread_ctx_t::thread_ctx_t () :
543 544
    _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
545 546 547 548 549
{
}

void zmq::thread_ctx_t::start_thread (thread_t &thread_,
                                      thread_fn *tfn_,
550 551
                                      void *arg_,
                                      const char *name_) const
552
{
553 554
    thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
                                     _thread_affinity_cpus);
555

556
    char namebuf[16] = "";
557 558 559 560 561
    snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
              _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
              _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
              name_ ? name_ : "");
    thread_.start (tfn_, arg_, namebuf);
562 563
}

564
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
565
{
566
    const bool is_int = (optvallen_ == sizeof (int));
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
    int value = 0;
    if (is_int)
        memcpy (&value, optval_, sizeof (int));

    switch (option_) {
        case ZMQ_THREAD_SCHED_POLICY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_sched_policy = value;
                return 0;
            }
            break;

        case ZMQ_THREAD_AFFINITY_CPU_ADD:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_affinity_cpus.insert (value);
                return 0;
            }
            break;

        case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                if (0 == _thread_affinity_cpus.erase (value)) {
                    errno = EINVAL;
                    return -1;
                }
                return 0;
            }
            break;

        case ZMQ_THREAD_PRIORITY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_priority = value;
                return 0;
            }
            break;

        case ZMQ_THREAD_NAME_PREFIX:
            // start_thread() allows max 16 chars for thread name
            if (is_int) {
                std::ostringstream s;
                s << value;
                scoped_lock_t locker (_opt_sync);
                _thread_name_prefix = s.str ();
                return 0;
            } else if (optvallen_ > 0 && optvallen_ <= 16) {
                scoped_lock_t locker (_opt_sync);
                _thread_name_prefix.assign (static_cast<const char *> (optval_),
                                            optvallen_);
                return 0;
            }
            break;
622
    }
623 624 625

    errno = EINVAL;
    return -1;
626 627
}

628 629 630
int zmq::thread_ctx_t::get (int option_,
                            void *optval_,
                            const size_t *optvallen_)
631
{
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
    const bool is_int = (*optvallen_ == sizeof (int));
    int *value = static_cast<int *> (optval_);

    switch (option_) {
        case ZMQ_THREAD_SCHED_POLICY:
            if (is_int) {
                scoped_lock_t locker (_opt_sync);
                *value = _thread_sched_policy;
                return 0;
            }
            break;

        case ZMQ_THREAD_NAME_PREFIX:
            if (is_int) {
                scoped_lock_t locker (_opt_sync);
                *value = atoi (_thread_name_prefix.c_str ());
                return 0;
            } else if (*optvallen_ >= _thread_name_prefix.size ()) {
                scoped_lock_t locker (_opt_sync);
                memcpy (optval_, _thread_name_prefix.data (),
                        _thread_name_prefix.size ());
                return 0;
            }
            break;
656
    }
657 658 659

    errno = EINVAL;
    return -1;
660 661
}

Martin Sustrik's avatar
Martin Sustrik committed
662
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
663
{
664
    _slots[tid_]->send (command_);
665 666
}

667
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
668
{
669
    if (_io_threads.empty ())
670 671
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
672
    //  Find the I/O thread with minimum load.
673
    int min_load = -1;
674
    io_thread_t *selected_io_thread = NULL;
675 676
    for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
         i++) {
677
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
678
            const int load = _io_threads[i]->get_load ();
679
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
680
                min_load = load;
681
                selected_io_thread = _io_threads[i];
Martin Sustrik's avatar
Martin Sustrik committed
682 683 684
            }
        }
    }
685
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
686
}
Martin Sustrik's avatar
Martin Sustrik committed
687

688
int zmq::ctx_t::register_endpoint (const char *addr_,
689
                                   const endpoint_t &endpoint_)
690
{
691
    scoped_lock_t locker (_endpoints_sync);
692

693
    const bool inserted =
694
      _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
695
        .second;
696 697 698 699 700 701 702
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

703
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
704
                                     const socket_base_t *const socket_)
Martin Hurton's avatar
Martin Hurton committed
705
{
706
    scoped_lock_t locker (_endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
707

708 709
    const endpoints_t::iterator it = _endpoints.find (addr_);
    if (it == _endpoints.end () || it->second.socket != socket_) {
Martin Hurton's avatar
Martin Hurton committed
710 711 712 713 714
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
715
    _endpoints.erase (it);
Martin Hurton's avatar
Martin Hurton committed
716 717 718 719

    return 0;
}

720
void zmq::ctx_t::unregister_endpoints (const socket_base_t *const socket_)
721
{
722
    scoped_lock_t locker (_endpoints_sync);
723

724 725 726
    for (endpoints_t::iterator it = _endpoints.begin (),
                               end = _endpoints.end ();
         it != end;) {
727 728 729 730 731 732 733
        if (it->second.socket == socket_)
#if __cplusplus >= 201103L
            it = _endpoints.erase (it);
#else
            _endpoints.erase (it++);
#endif
        else
734
            ++it;
735 736 737
    }
}

738
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
739
{
740
    scoped_lock_t locker (_endpoints_sync);
741

742 743
    endpoints_t::iterator it = _endpoints.find (addr_);
    if (it == _endpoints.end ()) {
744
        errno = ECONNREFUSED;
745
        endpoint_t empty = {NULL, options_t ()};
746
        return empty;
747 748
    }
    endpoint_t endpoint = it->second;
749

750 751 752 753 754
    //  Increment the command sequence number of the peer so that it won't
    //  get deallocated until "bind" command is issued by the caller.
    //  The subsequent 'bind' has to be called with inc_seqnum parameter
    //  set to false, so that the seqnum isn't incremented twice.
    endpoint.socket->inc_seqnum ();
755

756
    return endpoint;
757
}
758

Martin Hurton's avatar
Martin Hurton committed
759
void zmq::ctx_t::pend_connection (const std::string &addr_,
760 761
                                  const endpoint_t &endpoint_,
                                  pipe_t **pipes_)
762
{
763
    scoped_lock_t locker (_endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
764

765 766
    const pending_connection_t pending_connection = {endpoint_, pipes_[0],
                                                     pipes_[1]};
767

768
    const endpoints_t::iterator it = _endpoints.find (addr_);
769
    if (it == _endpoints.end ()) {
770
        //  Still no bind.
Martin Hurton's avatar
Martin Hurton committed
771
        endpoint_.socket->inc_seqnum ();
772 773
        _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
                                                        pending_connection);
774
    } else {
775
        //  Bind has happened in the mean time, connect directly
776 777
        connect_inproc_sockets (it->second.socket, it->second.options,
                                pending_connection, connect_side);
778
    }
779 780
}

781 782
void zmq::ctx_t::connect_pending (const char *addr_,
                                  zmq::socket_base_t *bind_socket_)
783
{
784
    scoped_lock_t locker (_endpoints_sync);
785

786 787
    const std::pair<pending_connections_t::iterator,
                    pending_connections_t::iterator>
788
      pending = _pending_connections.equal_range (addr_);
789 790
    for (pending_connections_t::iterator p = pending.first; p != pending.second;
         ++p)
791
        connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
792
                                p->second, bind_side);
793

794
    _pending_connections.erase (pending.first, pending.second);
795 796
}

797 798
void zmq::ctx_t::connect_inproc_sockets (
  zmq::socket_base_t *bind_socket_,
799
  const options_t &bind_options_,
800 801
  const pending_connection_t &pending_connection_,
  side side_)
802
{
803
    bind_socket_->inc_seqnum ();
Martin Hurton's avatar
Martin Hurton committed
804
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
805

806
    if (!bind_options_.recv_routing_id) {
807 808 809 810 811 812 813
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

814
    if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
815 816
        pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
                                                          bind_options_.rcvhwm);
817 818 819 820 821 822 823
        pending_connection_.bind_pipe->set_hwms_boost (
          pending_connection_.endpoint.options.sndhwm,
          pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms (
          pending_connection_.endpoint.options.rcvhwm,
          pending_connection_.endpoint.options.sndhwm);
824 825
        pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
                                                 bind_options_.sndhwm);
826 827 828
    } else {
        pending_connection_.connect_pipe->set_hwms (-1, -1);
        pending_connection_.bind_pipe->set_hwms (-1, -1);
829
    }
830 831 832 833 834 835

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
836 837 838 839 840
        bind_socket_->send_inproc_connected (
          pending_connection_.endpoint.socket);
    } else
        pending_connection_.connect_pipe->send_bind (
          bind_socket_, pending_connection_.bind_pipe, false);
841

842 843 844
    // When a ctx is terminated all pending inproc connection will be
    // connected, but the socket will already be closed and the pipe will be
    // in waiting_for_delimiter state, which means no more writes can be done
845
    // and the routing id write fails and causes an assert. Check if the socket
846
    // is open before sending.
847 848
    if (pending_connection_.endpoint.options.recv_routing_id
        && pending_connection_.endpoint.socket->check_tag ()) {
849
        send_routing_id (pending_connection_.bind_pipe, bind_options_);
850
    }
851 852 853 854 855 856 857 858

#ifdef ZMQ_BUILD_DRAFT_API
    //  If set, send the hello msg of the bind socket to the pending connection.
    if (bind_options_.can_send_hello_msg
        && bind_options_.hello_msg.size () > 0) {
        send_hello_msg (pending_connection_.bind_pipe, bind_options_);
    }
#endif
859 860
}

Ilya Kulakov's avatar
Ilya Kulakov committed
861 862 863 864
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
865
    zmq::scoped_lock_t locker (_vmci_sync);
Ilya Kulakov's avatar
Ilya Kulakov committed
866

867 868
    if (_vmci_fd == -1) {
        _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
Ilya Kulakov's avatar
Ilya Kulakov committed
869

870
        if (_vmci_fd != -1) {
Ilya Kulakov's avatar
Ilya Kulakov committed
871
#ifdef FD_CLOEXEC
872
            int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
Ilya Kulakov's avatar
Ilya Kulakov committed
873 874 875 876 877
            errno_assert (rc != -1);
#endif
        }
    }

878
    return _vmci_family;
Ilya Kulakov's avatar
Ilya Kulakov committed
879 880 881 882
}

#endif

883 884 885 886
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;