ctx.cpp 24.4 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32
#ifndef ZMQ_HAVE_WINDOWS
33 34 35
#include <unistd.h>
#endif

36
#include <limits>
37
#include <climits>
38
#include <new>
f18m's avatar
f18m committed
39
#include <sstream>
40
#include <string.h>
41

42
#include "ctx.hpp"
43
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
44
#include "io_thread.hpp"
45
#include "reaper.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
46
#include "pipe.hpp"
47 48
#include "err.hpp"
#include "msg.hpp"
49
#include "random.hpp"
50

Ilya Kulakov's avatar
Ilya Kulakov committed
51 52 53 54
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif

55 56 57 58
#ifdef ZMQ_USE_NSS
#include <nss.h>
#endif

59 60 61 62
#ifdef ZMQ_USE_GNUTLS
#include <gnutls/gnutls.h>
#endif

63
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
64
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
65

66
int clipped_maxsocket (int max_requested_)
67
{
68
    if (max_requested_ >= zmq::poller_t::max_fds ()
69
        && zmq::poller_t::max_fds () != -1)
Pieter Hintjens's avatar
Pieter Hintjens committed
70
        // -1 because we need room for the reaper mailbox.
71
        max_requested_ = zmq::poller_t::max_fds () - 1;
Martin Hurton's avatar
Martin Hurton committed
72

73
    return max_requested_;
74 75
}

76
zmq::ctx_t::ctx_t () :
77 78 79 80 81 82 83 84 85 86
    _tag (ZMQ_CTX_TAG_VALUE_GOOD),
    _starting (true),
    _terminating (false),
    _reaper (NULL),
    _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
    _max_msgsz (INT_MAX),
    _io_thread_count (ZMQ_IO_THREADS_DFLT),
    _blocky (true),
    _ipv6 (false),
    _zero_copy (true)
Martin Sustrik's avatar
Martin Sustrik committed
87
{
88
#ifdef HAVE_FORK
89
    _pid = getpid ();
90
#endif
Ilya Kulakov's avatar
Ilya Kulakov committed
91
#ifdef ZMQ_HAVE_VMCI
92 93
    _vmci_fd = -1;
    _vmci_family = -1;
Ilya Kulakov's avatar
Ilya Kulakov committed
94
#endif
95

96 97
    //  Initialise crypto library, if needed.
    zmq::random_open ();
98 99 100 101

#ifdef ZMQ_USE_NSS
    NSS_NoDB_Init (NULL);
#endif
102 103 104 105

#ifdef ZMQ_USE_GNUTLS
    gnutls_global_init ();
#endif
106 107
}

108 109
bool zmq::ctx_t::check_tag ()
{
110
    return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
111 112
}

113
zmq::ctx_t::~ctx_t ()
Martin Sustrik's avatar
Martin Sustrik committed
114
{
115 116
    //  Check that there are no remaining _sockets.
    zmq_assert (_sockets.empty ());
117

118 119
    //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
    //  thread subsequent invocation of destructor would hang-up.
120 121
    for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
        _io_threads[i]->stop ();
122
    }
Martin Sustrik's avatar
Martin Sustrik committed
123 124

    //  Wait till I/O threads actually terminate.
125 126
    for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
        LIBZMQ_DELETE (_io_threads[i]);
127
    }
Martin Sustrik's avatar
Martin Sustrik committed
128

129
    //  Deallocate the reaper thread object.
130
    LIBZMQ_DELETE (_reaper);
131

132
    //  The mailboxes in _slots themselves were deallocated with their
133
    //  corresponding io_thread/socket objects.
134

135 136
    //  De-initialise crypto library, if needed.
    zmq::random_close ();
137

138 139 140 141
#ifdef ZMQ_USE_NSS
    NSS_Shutdown ();
#endif

142 143 144 145
#ifdef ZMQ_USE_GNUTLS
    gnutls_global_deinit ();
#endif

146
    //  Remove the tag, so that the object is considered dead.
147
    _tag = ZMQ_CTX_TAG_VALUE_BAD;
Martin Sustrik's avatar
Martin Sustrik committed
148 149
}

150 151
bool zmq::ctx_t::valid () const
{
152
    return _term_mailbox.valid ();
153 154
}

155
int zmq::ctx_t::terminate ()
Martin Sustrik's avatar
Martin Sustrik committed
156
{
157
    _slot_sync.lock ();
158

159 160
    bool save_terminating = _terminating;
    _terminating = false;
161

162
    // Connect up any pending inproc connections, otherwise we will hang
163
    pending_connections_t copy = _pending_connections;
164 165
    for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
         p != end; ++p) {
166
        zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
167 168
        // create_socket might fail eg: out of memory/sockets limit reached
        zmq_assert (s);
169 170 171
        s->bind (p->first.c_str ());
        s->close ();
    }
172
    _terminating = save_terminating;
173

174
    if (!_starting) {
175
#ifdef HAVE_FORK
176
        if (_pid != getpid ()) {
177 178
            // we are a forked child process. Close all file descriptors
            // inherited from the parent.
179 180
            for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
                _sockets[i]->get_mailbox ()->forked ();
181

182
            _term_mailbox.forked ();
183 184
        }
#endif
185

186 187
        //  Check whether termination was already underway, but interrupted and now
        //  restarted.
188 189
        bool restarted = _terminating;
        _terminating = true;
190

191 192 193 194 195
        //  First attempt to terminate the context.
        if (!restarted) {
            //  First send stop command to sockets so that any blocking calls
            //  can be interrupted. If there are no sockets we can ask reaper
            //  thread to stop.
196 197 198 199
            for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
                _sockets[i]->stop ();
            if (_sockets.empty ())
                _reaper->stop ();
200
        }
201
        _slot_sync.unlock ();
202 203 204

        //  Wait till reaper thread closes all the sockets.
        command_t cmd;
205
        int rc = _term_mailbox.recv (&cmd, -1);
206 207
        if (rc == -1 && errno == EINTR)
            return -1;
208
        errno_assert (rc == 0);
209
        zmq_assert (cmd.type == command_t::done);
210 211
        _slot_sync.lock ();
        zmq_assert (_sockets.empty ());
212
    }
213
    _slot_sync.unlock ();
214

Ilya Kulakov's avatar
Ilya Kulakov committed
215
#ifdef ZMQ_HAVE_VMCI
216
    _vmci_sync.lock ();
Ilya Kulakov's avatar
Ilya Kulakov committed
217

218 219 220
    VMCISock_ReleaseAFValueFd (_vmci_fd);
    _vmci_family = -1;
    _vmci_fd = -1;
Ilya Kulakov's avatar
Ilya Kulakov committed
221

222
    _vmci_sync.unlock ();
Ilya Kulakov's avatar
Ilya Kulakov committed
223 224
#endif

225 226
    //  Deallocate the resources.
    delete this;
227

228 229
    return 0;
}
230

231 232
int zmq::ctx_t::shutdown ()
{
233
    scoped_lock_t locker (_slot_sync);
234

235 236
    if (!_starting && !_terminating) {
        _terminating = true;
237 238 239 240

        //  Send stop command to sockets so that any blocking calls
        //  can be interrupted. If there are no sockets we can ask reaper
        //  thread to stop.
241 242 243 244
        for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
            _sockets[i]->stop ();
        if (_sockets.empty ())
            _reaper->stop ();
245 246 247 248 249
    }

    return 0;
}

250
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
251
{
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
    bool is_int = (optvallen_ == sizeof (int));
    int value = 0;
    if (is_int)
        memcpy (&value, optval_, sizeof (int));

    switch (option_) {
        case ZMQ_MAX_SOCKETS:
            if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
                scoped_lock_t locker (_opt_sync);
                _max_sockets = value;
                return 0;
            }
            break;

        case ZMQ_IO_THREADS:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _io_thread_count = value;
                return 0;
            }
            break;

        case ZMQ_IPV6:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _ipv6 = (value != 0);
                return 0;
            }
            break;

        case ZMQ_BLOCKY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _blocky = (value != 0);
                return 0;
            }
            break;

        case ZMQ_MAX_MSGSZ:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _max_msgsz = value < INT_MAX ? value : INT_MAX;
                return 0;
            }
            break;

        case ZMQ_ZERO_COPY_RECV:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _zero_copy = (value != 0);
                return 0;
            }
            break;

        default: {
            return thread_ctx_t::set (option_, optval_, optvallen_);
        }
309
    }
310 311 312

    errno = EINVAL;
    return -1;
313 314
}

315
int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_)
316
{
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
    const bool is_int = (*optvallen_ == sizeof (int));
    int *value = static_cast<int *> (optval_);

    switch (option_) {
        case ZMQ_MAX_SOCKETS:
            if (is_int) {
                *value = _max_sockets;
                return 0;
            }
            break;

        case ZMQ_SOCKET_LIMIT:
            if (is_int) {
                *value = clipped_maxsocket (65535);
                return 0;
            }
            break;

        case ZMQ_IO_THREADS:
            if (is_int) {
                *value = _io_thread_count;
                return 0;
            }
            break;

        case ZMQ_IPV6:
            if (is_int) {
                *value = _ipv6;
                return 0;
            }
            break;

        case ZMQ_BLOCKY:
            if (is_int) {
                *value = _blocky;
                return 0;
            }
            break;

        case ZMQ_MAX_MSGSZ:
            if (is_int) {
                *value = _max_msgsz;
                return 0;
            }
            break;

        case ZMQ_MSG_T_SIZE:
            if (is_int) {
                *value = sizeof (zmq_msg_t);
                return 0;
            }
            break;

        case ZMQ_ZERO_COPY_RECV:
            if (is_int) {
                *value = _zero_copy;
                return 0;
            }
            break;

        default: {
            return thread_ctx_t::get (option_, optval_, optvallen_);
        }
380
    }
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395

    errno = EINVAL;
    return -1;
}

int zmq::ctx_t::get (int option_)
{
    int optval_ = 0;
    size_t optvallen_ = sizeof (int);

    if (get (option_, &optval_, &optvallen_) == 0)
        return optval_;

    errno = EINVAL;
    return -1;
396 397
}

398
bool zmq::ctx_t::start ()
399
{
400
    //  Initialise the array of mailboxes. Additional two slots are for
401
    //  zmq_ctx_term thread and reaper thread.
402
    _opt_sync.lock ();
403 404 405
    const int term_and_reaper_threads_count = 2;
    const int mazmq = _max_sockets;
    const int ios = _io_thread_count;
406
    _opt_sync.unlock ();
407 408 409 410 411 412
    int slot_count = mazmq + ios + term_and_reaper_threads_count;
    try {
        _slots.reserve (slot_count);
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
    }
    catch (const std::bad_alloc &) {
413
        errno = ENOMEM;
414
        return false;
415
    }
416
    _slots.resize (term_and_reaper_threads_count);
417

418
    //  Initialise the infrastructure for zmq_ctx_term thread.
419
    _slots[term_tid] = &_term_mailbox;
420

421
    //  Create the reaper thread.
422 423
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
    if (!_reaper) {
424 425 426
        errno = ENOMEM;
        goto fail_cleanup_slots;
    }
427
    if (!_reaper->get_mailbox ()->valid ())
428
        goto fail_cleanup_reaper;
429 430
    _slots[reaper_tid] = _reaper->get_mailbox ();
    _reaper->start ();
431 432

    //  Create I/O thread objects and launch them.
433
    _slots.resize (slot_count, NULL);
434

435 436
    for (int i = term_and_reaper_threads_count;
         i != ios + term_and_reaper_threads_count; i++) {
437 438 439 440 441 442 443 444
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
        if (!io_thread) {
            errno = ENOMEM;
            goto fail_cleanup_reaper;
        }
        if (!io_thread->get_mailbox ()->valid ()) {
            delete io_thread;
            goto fail_cleanup_reaper;
445
        }
446 447
        _io_threads.push_back (io_thread);
        _slots[i] = io_thread->get_mailbox ();
448 449 450 451
        io_thread->start ();
    }

    //  In the unused part of the slot array, create a list of empty slots.
452 453
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
454
        _empty_slots.push_back (i);
455 456
    }

457
    _starting = false;
458 459 460
    return true;

fail_cleanup_reaper:
461 462 463
    _reaper->stop ();
    delete _reaper;
    _reaper = NULL;
464 465

fail_cleanup_slots:
466
    _slots.clear ();
467 468 469 470 471
    return false;
}

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
472
    scoped_lock_t locker (_slot_sync);
473

474
    if (unlikely (_starting)) {
475 476
        if (!start ())
            return NULL;
477 478
    }

Pieter Hintjens's avatar
Pieter Hintjens committed
479
    //  Once zmq_ctx_term() was called, we can't create new sockets.
480
    if (_terminating) {
481 482 483
        errno = ETERM;
        return NULL;
    }
484

485
    //  If max_sockets limit was reached, return error.
486
    if (_empty_slots.empty ()) {
487 488
        errno = EMFILE;
        return NULL;
Martin Sustrik's avatar
Martin Sustrik committed
489
    }
490

491
    //  Choose a slot for the socket.
492 493
    uint32_t slot = _empty_slots.back ();
    _empty_slots.pop_back ();
494

495
    //  Generate new unique socket ID.
496
    int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
497

498
    //  Create the socket and register its mailbox.
499
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
500
    if (!s) {
501
        _empty_slots.push_back (slot);
502
        return NULL;
503
    }
504 505
    _sockets.push_back (s);
    _slots[slot] = s->get_mailbox ();
506 507

    return s;
Martin Sustrik's avatar
Martin Sustrik committed
508 509
}

510
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
511
{
512
    scoped_lock_t locker (_slot_sync);
513

Martin Hurton's avatar
Martin Hurton committed
514
    //  Free the associated thread slot.
515
    uint32_t tid = socket_->get_tid ();
516 517
    _empty_slots.push_back (tid);
    _slots[tid] = NULL;
518

519
    //  Remove the socket from the list of sockets.
520
    _sockets.erase (socket_);
521

Pieter Hintjens's avatar
Pieter Hintjens committed
522
    //  If zmq_ctx_term() was already called and there are no more socket
523
    //  we can ask reaper thread to terminate.
524 525
    if (_terminating && _sockets.empty ())
        _reaper->stop ();
526 527
}

528 529
zmq::object_t *zmq::ctx_t::get_reaper ()
{
530
    return _reaper;
531 532
}

533
zmq::thread_ctx_t::thread_ctx_t () :
534 535
    _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
    _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
536 537 538 539 540
{
}

void zmq::thread_ctx_t::start_thread (thread_t &thread_,
                                      thread_fn *tfn_,
541 542
                                      void *arg_,
                                      const char *name_) const
543
{
544 545
    thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
                                     _thread_affinity_cpus);
546

547
    char namebuf[16] = "";
548 549 550 551 552
    snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
              _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
              _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
              name_ ? name_ : "");
    thread_.start (tfn_, arg_, namebuf);
553 554
}

555
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
556
{
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
    bool is_int = (optvallen_ == sizeof (int));
    int value = 0;
    if (is_int)
        memcpy (&value, optval_, sizeof (int));

    switch (option_) {
        case ZMQ_THREAD_SCHED_POLICY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_sched_policy = value;
                return 0;
            }
            break;

        case ZMQ_THREAD_AFFINITY_CPU_ADD:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_affinity_cpus.insert (value);
                return 0;
            }
            break;

        case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                if (0 == _thread_affinity_cpus.erase (value)) {
                    errno = EINVAL;
                    return -1;
                }
                return 0;
            }
            break;

        case ZMQ_THREAD_PRIORITY:
            if (is_int && value >= 0) {
                scoped_lock_t locker (_opt_sync);
                _thread_priority = value;
                return 0;
            }
            break;

        case ZMQ_THREAD_NAME_PREFIX:
            // start_thread() allows max 16 chars for thread name
            if (is_int) {
                std::ostringstream s;
                s << value;
                scoped_lock_t locker (_opt_sync);
                _thread_name_prefix = s.str ();
                return 0;
            } else if (optvallen_ > 0 && optvallen_ <= 16) {
                scoped_lock_t locker (_opt_sync);
                _thread_name_prefix.assign (static_cast<const char *> (optval_),
                                            optvallen_);
                return 0;
            }
            break;
613
    }
614 615 616

    errno = EINVAL;
    return -1;
617 618
}

619
int zmq::thread_ctx_t::get (int option_, void *optval_, size_t *optvallen_)
620
{
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644
    const bool is_int = (*optvallen_ == sizeof (int));
    int *value = static_cast<int *> (optval_);

    switch (option_) {
        case ZMQ_THREAD_SCHED_POLICY:
            if (is_int) {
                scoped_lock_t locker (_opt_sync);
                *value = _thread_sched_policy;
                return 0;
            }
            break;

        case ZMQ_THREAD_NAME_PREFIX:
            if (is_int) {
                scoped_lock_t locker (_opt_sync);
                *value = atoi (_thread_name_prefix.c_str ());
                return 0;
            } else if (*optvallen_ >= _thread_name_prefix.size ()) {
                scoped_lock_t locker (_opt_sync);
                memcpy (optval_, _thread_name_prefix.data (),
                        _thread_name_prefix.size ());
                return 0;
            }
            break;
645
    }
646 647 648

    errno = EINVAL;
    return -1;
649 650
}

Martin Sustrik's avatar
Martin Sustrik committed
651
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
652
{
653
    _slots[tid_]->send (command_);
654 655
}

656
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
Martin Sustrik's avatar
Martin Sustrik committed
657
{
658
    if (_io_threads.empty ())
659 660
        return NULL;

Martin Sustrik's avatar
Martin Sustrik committed
661
    //  Find the I/O thread with minimum load.
662
    int min_load = -1;
663
    io_thread_t *selected_io_thread = NULL;
664
    for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
665
        if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
666
            int load = _io_threads[i]->get_load ();
667
            if (selected_io_thread == NULL || load < min_load) {
Martin Sustrik's avatar
Martin Sustrik committed
668
                min_load = load;
669
                selected_io_thread = _io_threads[i];
Martin Sustrik's avatar
Martin Sustrik committed
670 671 672
            }
        }
    }
673
    return selected_io_thread;
Martin Sustrik's avatar
Martin Sustrik committed
674
}
Martin Sustrik's avatar
Martin Sustrik committed
675

676
int zmq::ctx_t::register_endpoint (const char *addr_,
677
                                   const endpoint_t &endpoint_)
678
{
679
    scoped_lock_t locker (_endpoints_sync);
680

681
    const bool inserted =
682
      _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
683
        .second;
684 685 686 687 688 689 690
    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

691 692
int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
                                     socket_base_t *socket_)
Martin Hurton's avatar
Martin Hurton committed
693
{
694
    scoped_lock_t locker (_endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
695

696 697
    const endpoints_t::iterator it = _endpoints.find (addr_);
    if (it == _endpoints.end () || it->second.socket != socket_) {
Martin Hurton's avatar
Martin Hurton committed
698 699 700 701 702
        errno = ENOENT;
        return -1;
    }

    //  Remove endpoint.
703
    _endpoints.erase (it);
Martin Hurton's avatar
Martin Hurton committed
704 705 706 707

    return 0;
}

708
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
709
{
710
    scoped_lock_t locker (_endpoints_sync);
711

712 713 714
    for (endpoints_t::iterator it = _endpoints.begin (),
                               end = _endpoints.end ();
         it != end;) {
715 716 717 718 719 720 721
        if (it->second.socket == socket_)
#if __cplusplus >= 201103L
            it = _endpoints.erase (it);
#else
            _endpoints.erase (it++);
#endif
        else
722
            ++it;
723 724 725
    }
}

726
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
727
{
728
    scoped_lock_t locker (_endpoints_sync);
729

730 731
    endpoints_t::iterator it = _endpoints.find (addr_);
    if (it == _endpoints.end ()) {
732
        errno = ECONNREFUSED;
733
        endpoint_t empty = {NULL, options_t ()};
734
        return empty;
735 736
    }
    endpoint_t endpoint = it->second;
737

738 739 740 741 742
    //  Increment the command sequence number of the peer so that it won't
    //  get deallocated until "bind" command is issued by the caller.
    //  The subsequent 'bind' has to be called with inc_seqnum parameter
    //  set to false, so that the seqnum isn't incremented twice.
    endpoint.socket->inc_seqnum ();
743

744
    return endpoint;
745
}
746

Martin Hurton's avatar
Martin Hurton committed
747
void zmq::ctx_t::pend_connection (const std::string &addr_,
748 749
                                  const endpoint_t &endpoint_,
                                  pipe_t **pipes_)
750
{
751
    scoped_lock_t locker (_endpoints_sync);
Martin Hurton's avatar
Martin Hurton committed
752

753 754
    const pending_connection_t pending_connection = {endpoint_, pipes_[0],
                                                     pipes_[1]};
755

756 757
    endpoints_t::iterator it = _endpoints.find (addr_);
    if (it == _endpoints.end ()) {
758
        //  Still no bind.
Martin Hurton's avatar
Martin Hurton committed
759
        endpoint_.socket->inc_seqnum ();
760 761
        _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
                                                        pending_connection);
762
    } else {
763
        //  Bind has happened in the mean time, connect directly
764 765
        connect_inproc_sockets (it->second.socket, it->second.options,
                                pending_connection, connect_side);
766
    }
767 768
}

769 770
void zmq::ctx_t::connect_pending (const char *addr_,
                                  zmq::socket_base_t *bind_socket_)
771
{
772
    scoped_lock_t locker (_endpoints_sync);
773

774
    std::pair<pending_connections_t::iterator, pending_connections_t::iterator>
775
      pending = _pending_connections.equal_range (addr_);
776 777
    for (pending_connections_t::iterator p = pending.first; p != pending.second;
         ++p)
778
        connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
779
                                p->second, bind_side);
780

781
    _pending_connections.erase (pending.first, pending.second);
782 783
}

784 785
void zmq::ctx_t::connect_inproc_sockets (
  zmq::socket_base_t *bind_socket_,
786
  options_t &bind_options_,
787 788
  const pending_connection_t &pending_connection_,
  side side_)
789
{
790
    bind_socket_->inc_seqnum ();
Martin Hurton's avatar
Martin Hurton committed
791
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
792

793
    if (!bind_options_.recv_routing_id) {
794 795 796 797 798 799 800
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

801
    if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
802 803
        pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
                                                          bind_options_.rcvhwm);
804 805 806 807 808 809 810
        pending_connection_.bind_pipe->set_hwms_boost (
          pending_connection_.endpoint.options.sndhwm,
          pending_connection_.endpoint.options.rcvhwm);

        pending_connection_.connect_pipe->set_hwms (
          pending_connection_.endpoint.options.rcvhwm,
          pending_connection_.endpoint.options.sndhwm);
811 812
        pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
                                                 bind_options_.sndhwm);
813 814 815
    } else {
        pending_connection_.connect_pipe->set_hwms (-1, -1);
        pending_connection_.bind_pipe->set_hwms (-1, -1);
816
    }
817 818 819 820 821 822

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
823 824 825 826 827
        bind_socket_->send_inproc_connected (
          pending_connection_.endpoint.socket);
    } else
        pending_connection_.connect_pipe->send_bind (
          bind_socket_, pending_connection_.bind_pipe, false);
828

829 830 831
    // When a ctx is terminated all pending inproc connection will be
    // connected, but the socket will already be closed and the pipe will be
    // in waiting_for_delimiter state, which means no more writes can be done
832
    // and the routing id write fails and causes an assert. Check if the socket
833
    // is open before sending.
834 835
    if (pending_connection_.endpoint.options.recv_routing_id
        && pending_connection_.endpoint.socket->check_tag ()) {
836
        send_routing_id (pending_connection_.bind_pipe, bind_options_);
837 838 839
    }
}

Ilya Kulakov's avatar
Ilya Kulakov committed
840 841 842 843
#ifdef ZMQ_HAVE_VMCI

int zmq::ctx_t::get_vmci_socket_family ()
{
844
    zmq::scoped_lock_t locker (_vmci_sync);
Ilya Kulakov's avatar
Ilya Kulakov committed
845

846 847
    if (_vmci_fd == -1) {
        _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
Ilya Kulakov's avatar
Ilya Kulakov committed
848

849
        if (_vmci_fd != -1) {
Ilya Kulakov's avatar
Ilya Kulakov committed
850
#ifdef FD_CLOEXEC
851
            int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
Ilya Kulakov's avatar
Ilya Kulakov committed
852 853 854 855 856
            errno_assert (rc != -1);
#endif
        }
    }

857
    return _vmci_family;
Ilya Kulakov's avatar
Ilya Kulakov committed
858 859 860 861
}

#endif

862 863 864 865
//  The last used socket ID, or 0 if no socket was used so far. Note that this
//  is a global variable. Thus, even sockets created in different contexts have
//  unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;